Always wake up readers after writing.
[free-sw/xcb/libxcb] / src / xcb_in.c
1 /* Copyright (C) 2001-2004 Bart Massey and Jamey Sharp.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a
4  * copy of this software and associated documentation files (the "Software"),
5  * to deal in the Software without restriction, including without limitation
6  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
7  * and/or sell copies of the Software, and to permit persons to whom the
8  * Software is furnished to do so, subject to the following conditions:
9  * 
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  * 
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
17  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
18  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
19  * 
20  * Except as contained in this notice, the names of the authors or their
21  * institutions shall not be used in advertising or otherwise to promote the
22  * sale, use or other dealings in this Software without prior written
23  * authorization from the authors.
24  */
25
26 /* Stuff that reads stuff from the server. */
27
28 #include <assert.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <errno.h>
34
35 #include "xcb.h"
36 #include "xcbext.h"
37 #include "xcbint.h"
38 #if USE_POLL
39 #include <poll.h>
40 #else
41 #include <sys/select.h>
42 #endif
43
44 #define XCB_ERROR 0
45 #define XCB_REPLY 1
46 #define XCB_XGE_EVENT 35
47
48 struct event_list {
49     xcb_generic_event_t *event;
50     struct event_list *next;
51 };
52
53 struct reply_list {
54     void *reply;
55     struct reply_list *next;
56 };
57
58 typedef struct pending_reply {
59     uint64_t first_request;
60     uint64_t last_request;
61     enum workarounds workaround;
62     int flags;
63     struct pending_reply *next;
64 } pending_reply;
65
66 typedef struct reader_list {
67     unsigned int request;
68     pthread_cond_t *data;
69     struct reader_list *next;
70 } reader_list;
71
72 static int read_packet(xcb_connection_t *c)
73 {
74     xcb_generic_reply_t genrep;
75     int length = 32;
76     int eventlength = 0; /* length after first 32 bytes for GenericEvents */
77     void *buf;
78     pending_reply *pend = 0;
79     struct event_list *event;
80
81     /* Wait for there to be enough data for us to read a whole packet */
82     if(c->in.queue_len < length)
83         return 0;
84
85     /* Get the response type, length, and sequence number. */
86     memcpy(&genrep, c->in.queue, sizeof(genrep));
87
88     /* Compute 32-bit sequence number of this packet. */
89     if((genrep.response_type & 0x7f) != XCB_KEYMAP_NOTIFY)
90     {
91         uint64_t lastread = c->in.request_read;
92         c->in.request_read = (lastread & UINT64_C(0xffffffffffff0000)) | genrep.sequence;
93         if(XCB_SEQUENCE_COMPARE(c->in.request_read, <, lastread))
94             c->in.request_read += 0x10000;
95         if(XCB_SEQUENCE_COMPARE(c->in.request_read, >, c->in.request_expected))
96             c->in.request_expected = c->in.request_read;
97
98         if(c->in.request_read != lastread)
99         {
100             if(c->in.current_reply)
101             {
102                 _xcb_map_put(c->in.replies, lastread, c->in.current_reply);
103                 c->in.current_reply = 0;
104                 c->in.current_reply_tail = &c->in.current_reply;
105             }
106             c->in.request_completed = c->in.request_read - 1;
107         }
108
109         while(c->in.pending_replies && 
110               c->in.pending_replies->workaround != WORKAROUND_EXTERNAL_SOCKET_OWNER &&
111               XCB_SEQUENCE_COMPARE (c->in.pending_replies->last_request, <=, c->in.request_completed))
112         {
113             pending_reply *oldpend = c->in.pending_replies;
114             c->in.pending_replies = oldpend->next;
115             if(!oldpend->next)
116                 c->in.pending_replies_tail = &c->in.pending_replies;
117             free(oldpend);
118         }
119
120         if(genrep.response_type == XCB_ERROR)
121             c->in.request_completed = c->in.request_read;
122     }
123
124     if(genrep.response_type == XCB_ERROR || genrep.response_type == XCB_REPLY)
125     {
126         pend = c->in.pending_replies;
127         if(pend &&
128            !(XCB_SEQUENCE_COMPARE(pend->first_request, <=, c->in.request_read) &&
129              (pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER ||
130               XCB_SEQUENCE_COMPARE(c->in.request_read, <=, pend->last_request))))
131             pend = 0;
132     }
133
134     /* For reply packets, check that the entire packet is available. */
135     if(genrep.response_type == XCB_REPLY)
136     {
137         if(pend && pend->workaround == WORKAROUND_GLX_GET_FB_CONFIGS_BUG)
138         {
139             uint32_t *p = (uint32_t *) c->in.queue;
140             genrep.length = p[2] * p[3] * 2;
141         }
142         length += genrep.length * 4;
143     }
144
145     /* XGE events may have sizes > 32 */
146     if (genrep.response_type == XCB_XGE_EVENT)
147         eventlength = genrep.length * 4;
148
149     buf = malloc(length + eventlength +
150             (genrep.response_type == XCB_REPLY ? 0 : sizeof(uint32_t)));
151     if(!buf)
152     {
153         _xcb_conn_shutdown(c);
154         return 0;
155     }
156
157     if(_xcb_in_read_block(c, buf, length) <= 0)
158     {
159         free(buf);
160         return 0;
161     }
162
163     /* pull in XGE event data if available, append after event struct */
164     if (eventlength)
165     {
166         if(_xcb_in_read_block(c, &((xcb_generic_event_t*)buf)[1], eventlength) <= 0)
167         {
168             free(buf);
169             return 0;
170         }
171     }
172
173     if(pend && (pend->flags & XCB_REQUEST_DISCARD_REPLY))
174     {
175         free(buf);
176         return 1;
177     }
178
179     if(genrep.response_type != XCB_REPLY)
180         ((xcb_generic_event_t *) buf)->full_sequence = c->in.request_read;
181
182     /* reply, or checked error */
183     if( genrep.response_type == XCB_REPLY ||
184        (genrep.response_type == XCB_ERROR && pend && (pend->flags & XCB_REQUEST_CHECKED)))
185     {
186         reader_list *reader;
187         struct reply_list *cur = malloc(sizeof(struct reply_list));
188         if(!cur)
189         {
190             _xcb_conn_shutdown(c);
191             free(buf);
192             return 0;
193         }
194         cur->reply = buf;
195         cur->next = 0;
196         *c->in.current_reply_tail = cur;
197         c->in.current_reply_tail = &cur->next;
198         for(reader = c->in.readers; 
199             reader && 
200             XCB_SEQUENCE_COMPARE_32(reader->request, <=, c->in.request_read);
201             reader = reader->next)
202         {
203             if(XCB_SEQUENCE_COMPARE_32(reader->request, ==, c->in.request_read))
204             {
205                 pthread_cond_signal(reader->data);
206                 break;
207             }
208         }
209         return 1;
210     }
211
212     /* event, or unchecked error */
213     event = malloc(sizeof(struct event_list));
214     if(!event)
215     {
216         _xcb_conn_shutdown(c);
217         free(buf);
218         return 0;
219     }
220     event->event = buf;
221     event->next = 0;
222     *c->in.events_tail = event;
223     c->in.events_tail = &event->next;
224     pthread_cond_signal(&c->in.event_cond);
225     return 1; /* I have something for you... */
226 }
227
228 static xcb_generic_event_t *get_event(xcb_connection_t *c)
229 {
230     struct event_list *cur = c->in.events;
231     xcb_generic_event_t *ret;
232     if(!c->in.events)
233         return 0;
234     ret = cur->event;
235     c->in.events = cur->next;
236     if(!cur->next)
237         c->in.events_tail = &c->in.events;
238     free(cur);
239     return ret;
240 }
241
242 static void free_reply_list(struct reply_list *head)
243 {
244     while(head)
245     {
246         struct reply_list *cur = head;
247         head = cur->next;
248         free(cur->reply);
249         free(cur);
250     }
251 }
252
253 static int read_block(const int fd, void *buf, const ssize_t len)
254 {
255     int done = 0;
256     while(done < len)
257     {
258         int ret = read(fd, ((char *) buf) + done, len - done);
259         if(ret > 0)
260             done += ret;
261         if(ret < 0 && errno == EAGAIN)
262         {
263 #if USE_POLL
264             struct pollfd pfd;
265             pfd.fd = fd;
266             pfd.events = POLLIN;
267             pfd.revents = 0;
268             do {
269                 ret = poll(&pfd, 1, -1);
270             } while (ret == -1 && errno == EINTR);
271 #else
272             fd_set fds;
273             FD_ZERO(&fds);
274             FD_SET(fd, &fds);
275             do {
276                 ret = select(fd + 1, &fds, 0, 0, 0);
277             } while (ret == -1 && errno == EINTR);
278 #endif
279         }
280         if(ret <= 0)
281             return ret;
282     }
283     return len;
284 }
285
286 static int poll_for_reply(xcb_connection_t *c, unsigned int request, void **reply, xcb_generic_error_t **error)
287 {
288     struct reply_list *head;
289
290     /* If an error occurred when issuing the request, fail immediately. */
291     if(!request)
292         head = 0;
293     /* We've read requests past the one we want, so if it has replies we have
294      * them all and they're in the replies map. */
295     else if(XCB_SEQUENCE_COMPARE_32(request, <, c->in.request_read))
296     {
297         head = _xcb_map_remove(c->in.replies, request);
298         if(head && head->next)
299             _xcb_map_put(c->in.replies, request, head->next);
300     }
301     /* We're currently processing the responses to the request we want, and we
302      * have a reply ready to return. So just return it without blocking. */
303     else if(XCB_SEQUENCE_COMPARE_32(request, ==, c->in.request_read) && c->in.current_reply)
304     {
305         head = c->in.current_reply;
306         c->in.current_reply = head->next;
307         if(!head->next)
308             c->in.current_reply_tail = &c->in.current_reply;
309     }
310     /* We know this request can't have any more replies, and we've already
311      * established it doesn't have a reply now. Don't bother blocking. */
312     else if(XCB_SEQUENCE_COMPARE_32(request, ==, c->in.request_completed))
313         head = 0;
314     /* We may have more replies on the way for this request: block until we're
315      * sure. */
316     else
317         return 0;
318
319     if(error)
320         *error = 0;
321     *reply = 0;
322
323     if(head)
324     {
325         if(((xcb_generic_reply_t *) head->reply)->response_type == XCB_ERROR)
326         {
327             if(error)
328                 *error = head->reply;
329             else
330                 free(head->reply);
331         }
332         else
333             *reply = head->reply;
334
335         free(head);
336     }
337
338     return 1;
339 }
340
341 /* Public interface */
342
343 void *xcb_wait_for_reply(xcb_connection_t *c, unsigned int request, xcb_generic_error_t **e)
344 {
345     uint64_t widened_request;
346     void *ret = 0;
347     if(e)
348         *e = 0;
349     if(c->has_error)
350         return 0;
351
352     pthread_mutex_lock(&c->iolock);
353
354     widened_request = (c->out.request & UINT64_C(0xffffffff00000000)) | request;
355     if(widened_request > c->out.request)
356         widened_request -= UINT64_C(1) << 32;
357
358     /* If this request has not been written yet, write it. */
359     if(c->out.return_socket || _xcb_out_flush_to(c, widened_request))
360     {
361         pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
362         reader_list reader;
363         reader_list **prev_reader;
364
365         for(prev_reader = &c->in.readers; 
366             *prev_reader && 
367             XCB_SEQUENCE_COMPARE_32((*prev_reader)->request, <=, request);
368             prev_reader = &(*prev_reader)->next)
369         {
370             /* empty */;
371         }
372         reader.request = request;
373         reader.data = &cond;
374         reader.next = *prev_reader;
375         *prev_reader = &reader;
376
377         while(!poll_for_reply(c, request, &ret, e))
378             if(!_xcb_conn_wait(c, &cond, 0, 0))
379                 break;
380
381         for(prev_reader = &c->in.readers;
382             *prev_reader && 
383             XCB_SEQUENCE_COMPARE_32((*prev_reader)->request, <=, request);
384             prev_reader = &(*prev_reader)->next)
385         {
386             if(*prev_reader == &reader)
387             {
388                 *prev_reader = (*prev_reader)->next;
389                 break;
390             }
391         }
392         pthread_cond_destroy(&cond);
393     }
394
395     _xcb_in_wake_up_next_reader(c);
396     pthread_mutex_unlock(&c->iolock);
397     return ret;
398 }
399
400 static void insert_pending_discard(xcb_connection_t *c, pending_reply **prev_next, uint64_t seq)
401 {
402     pending_reply *pend;
403     pend = malloc(sizeof(*pend));
404     if(!pend)
405     {
406         _xcb_conn_shutdown(c);
407         return;
408     }
409
410     pend->first_request = seq;
411     pend->last_request = seq;
412     pend->workaround = 0;
413     pend->flags = XCB_REQUEST_DISCARD_REPLY;
414     pend->next = *prev_next;
415     *prev_next = pend;
416
417     if(!pend->next)
418         c->in.pending_replies_tail = &pend->next;
419 }
420
421 static void discard_reply(xcb_connection_t *c, unsigned int request)
422 {
423     pending_reply *pend = 0;
424     pending_reply **prev_pend;
425     uint64_t widened_request;
426
427     /* We've read requests past the one we want, so if it has replies we have
428      * them all and they're in the replies map. */
429     if(XCB_SEQUENCE_COMPARE_32(request, <, c->in.request_read))
430     {
431         struct reply_list *head;
432         head = _xcb_map_remove(c->in.replies, request);
433         while (head)
434         {
435             struct reply_list *next = head->next;
436             free(head->reply);
437             free(head);
438             head = next;
439         }
440         return;
441     }
442
443     /* We're currently processing the responses to the request we want, and we
444      * have a reply ready to return. Free it, and mark the pend to free any further
445      * replies. */
446     if(XCB_SEQUENCE_COMPARE_32(request, ==, c->in.request_read) && c->in.current_reply)
447     {
448         struct reply_list *head;
449         head = c->in.current_reply;
450         c->in.current_reply = NULL;
451         c->in.current_reply_tail = &c->in.current_reply;
452         while (head)
453         {
454             struct reply_list *next = head->next;
455             free(head->reply);
456             free(head);
457             head = next;
458         }
459
460         pend = c->in.pending_replies;
461         if(pend &&
462             !(XCB_SEQUENCE_COMPARE(pend->first_request, <=, c->in.request_read) &&
463              (pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER ||
464               XCB_SEQUENCE_COMPARE(c->in.request_read, <=, pend->last_request))))
465             pend = 0;
466         if(pend)
467             pend->flags |= XCB_REQUEST_DISCARD_REPLY;
468         else
469             insert_pending_discard(c, &c->in.pending_replies, c->in.request_read);
470
471         return;
472     }
473
474     /* Walk the list of pending requests. Mark the first match for deletion. */
475     for(prev_pend = &c->in.pending_replies; *prev_pend; prev_pend = &(*prev_pend)->next)
476     {
477         if(XCB_SEQUENCE_COMPARE_32((*prev_pend)->first_request, >, request))
478             break;
479
480         if(XCB_SEQUENCE_COMPARE_32((*prev_pend)->first_request, ==, request))
481         {
482             /* Pending reply found. Mark for discard: */
483             (*prev_pend)->flags |= XCB_REQUEST_DISCARD_REPLY;
484             return;
485         }
486     }
487
488     /* Pending reply not found (likely due to _unchecked request). Create one: */
489     widened_request = (c->out.request & UINT64_C(0xffffffff00000000)) | request;
490     if(widened_request > c->out.request)
491         widened_request -= UINT64_C(1) << 32;
492
493     insert_pending_discard(c, prev_pend, widened_request);
494 }
495
496 void xcb_discard_reply(xcb_connection_t *c, unsigned int sequence)
497 {
498     if(c->has_error)
499         return;
500
501     /* If an error occurred when issuing the request, fail immediately. */
502     if(!sequence)
503         return;
504
505     pthread_mutex_lock(&c->iolock);
506     discard_reply(c, sequence);
507     pthread_mutex_unlock(&c->iolock);
508 }
509
510 int xcb_poll_for_reply(xcb_connection_t *c, unsigned int request, void **reply, xcb_generic_error_t **error)
511 {
512     int ret;
513     if(c->has_error)
514     {
515         *reply = 0;
516         if(error)
517             *error = 0;
518         return 1; /* would not block */
519     }
520     assert(reply != 0);
521     pthread_mutex_lock(&c->iolock);
522     ret = poll_for_reply(c, request, reply, error);
523     pthread_mutex_unlock(&c->iolock);
524     return ret;
525 }
526
527 xcb_generic_event_t *xcb_wait_for_event(xcb_connection_t *c)
528 {
529     xcb_generic_event_t *ret;
530     if(c->has_error)
531         return 0;
532     pthread_mutex_lock(&c->iolock);
533     /* get_event returns 0 on empty list. */
534     while(!(ret = get_event(c)))
535         if(!_xcb_conn_wait(c, &c->in.event_cond, 0, 0))
536             break;
537
538     _xcb_in_wake_up_next_reader(c);
539     pthread_mutex_unlock(&c->iolock);
540     return ret;
541 }
542
543 xcb_generic_event_t *xcb_poll_for_event(xcb_connection_t *c)
544 {
545     xcb_generic_event_t *ret = 0;
546     if(!c->has_error)
547     {
548         pthread_mutex_lock(&c->iolock);
549         /* FIXME: follow X meets Z architecture changes. */
550         ret = get_event(c);
551         if(!ret && _xcb_in_read(c)) /* _xcb_in_read shuts down the connection on error */
552             ret = get_event(c);
553         pthread_mutex_unlock(&c->iolock);
554     }
555     return ret;
556 }
557
558 xcb_generic_error_t *xcb_request_check(xcb_connection_t *c, xcb_void_cookie_t cookie)
559 {
560     /* FIXME: this could hold the lock to avoid syncing unnecessarily, but
561      * that would require factoring the locking out of xcb_get_input_focus,
562      * xcb_get_input_focus_reply, and xcb_wait_for_reply. */
563     xcb_generic_error_t *ret;
564     void *reply;
565     if(c->has_error)
566         return 0;
567     if(XCB_SEQUENCE_COMPARE_32(cookie.sequence,>,c->in.request_expected)
568        && XCB_SEQUENCE_COMPARE_32(cookie.sequence,>,c->in.request_completed))
569     {
570         free(xcb_get_input_focus_reply(c, xcb_get_input_focus(c), &ret));
571         assert(!ret);
572     }
573     reply = xcb_wait_for_reply(c, cookie.sequence, &ret);
574     assert(!reply);
575     return ret;
576 }
577
578 /* Private interface */
579
580 int _xcb_in_init(_xcb_in *in)
581 {
582     if(pthread_cond_init(&in->event_cond, 0))
583         return 0;
584     in->reading = 0;
585
586     in->queue_len = 0;
587
588     in->request_read = 0;
589     in->request_completed = 0;
590
591     in->replies = _xcb_map_new();
592     if(!in->replies)
593         return 0;
594
595     in->current_reply_tail = &in->current_reply;
596     in->events_tail = &in->events;
597     in->pending_replies_tail = &in->pending_replies;
598
599     return 1;
600 }
601
602 void _xcb_in_destroy(_xcb_in *in)
603 {
604     pthread_cond_destroy(&in->event_cond);
605     free_reply_list(in->current_reply);
606     _xcb_map_delete(in->replies, (void (*)(void *)) free_reply_list);
607     while(in->events)
608     {
609         struct event_list *e = in->events;
610         in->events = e->next;
611         free(e->event);
612         free(e);
613     }
614     while(in->pending_replies)
615     {
616         pending_reply *pend = in->pending_replies;
617         in->pending_replies = pend->next;
618         free(pend);
619     }
620 }
621
622 void _xcb_in_wake_up_next_reader(xcb_connection_t *c)
623 {
624     int pthreadret;
625     if(c->in.readers)
626         pthreadret = pthread_cond_signal(c->in.readers->data);
627     else
628         pthreadret = pthread_cond_signal(&c->in.event_cond);
629     assert(pthreadret == 0);
630 }
631
632 int _xcb_in_expect_reply(xcb_connection_t *c, uint64_t request, enum workarounds workaround, int flags)
633 {
634     pending_reply *pend = malloc(sizeof(pending_reply));
635     assert(workaround != WORKAROUND_NONE || flags != 0);
636     if(!pend)
637     {
638         _xcb_conn_shutdown(c);
639         return 0;
640     }
641     pend->first_request = pend->last_request = request;
642     pend->workaround = workaround;
643     pend->flags = flags;
644     pend->next = 0;
645     *c->in.pending_replies_tail = pend;
646     c->in.pending_replies_tail = &pend->next;
647     return 1;
648 }
649
650 void _xcb_in_replies_done(xcb_connection_t *c)
651 {
652     struct pending_reply *pend;
653     if (c->in.pending_replies_tail != &c->in.pending_replies)
654     {
655         pend = container_of(c->in.pending_replies_tail, struct pending_reply, next);
656         if(pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER)
657         {
658             pend->last_request = c->out.request;
659             pend->workaround = WORKAROUND_NONE;
660         }
661     }
662 }
663
664 int _xcb_in_read(xcb_connection_t *c)
665 {
666     int n = read(c->fd, c->in.queue + c->in.queue_len, sizeof(c->in.queue) - c->in.queue_len);
667     if(n > 0)
668         c->in.queue_len += n;
669     while(read_packet(c))
670         /* empty */;
671     if((n > 0) || (n < 0 && errno == EAGAIN))
672         return 1;
673     _xcb_conn_shutdown(c);
674     return 0;
675 }
676
677 int _xcb_in_read_block(xcb_connection_t *c, void *buf, int len)
678 {
679     int done = c->in.queue_len;
680     if(len < done)
681         done = len;
682
683     memcpy(buf, c->in.queue, done);
684     c->in.queue_len -= done;
685     memmove(c->in.queue, c->in.queue + done, c->in.queue_len);
686
687     if(len > done)
688     {
689         int ret = read_block(c->fd, (char *) buf + done, len - done);
690         if(ret <= 0)
691         {
692             _xcb_conn_shutdown(c);
693             return ret;
694         }
695     }
696
697     return len;
698 }