File: | aio/aio.c |
Location: | line 252, column 9 |
Description: | Assigned value is garbage or undefined |
1 | #include <aio.h> | |||
2 | #include <pthread__pthread.h> | |||
3 | #include <semaphore.h> | |||
4 | #include <limits.h> | |||
5 | #include <errno(*__errno_location()).h> | |||
6 | #include <unistd.h> | |||
7 | #include <stdlib.h> | |||
8 | #include "syscall.h" | |||
9 | #include "atomic.h" | |||
10 | #include "libc.h" | |||
11 | #include "pthread_impl.h" | |||
12 | ||||
13 | /* The following is a threads-based implementation of AIO with minimal | |||
14 | * dependence on implementation details. Most synchronization is | |||
15 | * performed with pthread primitives, but atomics and futex operations | |||
16 | * are used for notification in a couple places where the pthread | |||
17 | * primitives would be inefficient or impractical. | |||
18 | * | |||
19 | * For each fd with outstanding aio operations, an aio_queue structure | |||
20 | * is maintained. These are reference-counted and destroyed by the last | |||
21 | * aio worker thread to exit. Accessing any member of the aio_queue | |||
22 | * structure requires a lock on the aio_queue. Adding and removing aio | |||
23 | * queues themselves requires a write lock on the global map object, | |||
24 | * a 4-level table mapping file descriptor numbers to aio queues. A | |||
25 | * read lock on the map is used to obtain locks on existing queues by | |||
26 | * excluding destruction of the queue by a different thread while it is | |||
27 | * being locked. | |||
28 | * | |||
29 | * Each aio queue has a list of active threads/operations. Presently there | |||
30 | * is a one to one relationship between threads and operations. The only | |||
31 | * members of the aio_thread structure which are accessed by other threads | |||
32 | * are the linked list pointers, op (which is immutable), running (which | |||
33 | * is updated atomically), and err (which is synchronized via running), | |||
34 | * so no locking is necessary. Most of the other other members are used | |||
35 | * for sharing data between the main flow of execution and cancellation | |||
36 | * cleanup handler. | |||
37 | * | |||
38 | * Taking any aio locks requires having all signals blocked. This is | |||
39 | * necessary because aio_cancel is needed by close, and close is required | |||
40 | * to be async-signal safe. All aio worker threads run with all signals | |||
41 | * blocked permanently. | |||
42 | */ | |||
43 | ||||
44 | struct aio_args { | |||
45 | struct aiocb *cb; | |||
46 | int op; | |||
47 | int err; | |||
48 | sem_t sem; | |||
49 | }; | |||
50 | ||||
51 | struct aio_thread { | |||
52 | pthread_t td; | |||
53 | struct aiocb *cb; | |||
54 | struct aio_thread *next, *prev; | |||
55 | struct aio_queue *q; | |||
56 | volatile int running; | |||
57 | int err, op; | |||
58 | ssize_t ret; | |||
59 | }; | |||
60 | ||||
61 | struct aio_queue { | |||
62 | int fd, seekable, append, ref, init; | |||
63 | pthread_mutex_t lock; | |||
64 | pthread_cond_t cond; | |||
65 | struct aio_thread *head; | |||
66 | }; | |||
67 | ||||
68 | static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER{{{0}}}; | |||
69 | static struct aio_queue *****map; | |||
70 | static volatile int aio_fd_cnt; | |||
71 | volatile int __aio_fut; | |||
72 | ||||
73 | static struct aio_queue *__aio_get_queue(int fd, int need) | |||
74 | { | |||
75 | if (fd < 0) return 0; | |||
76 | int a=fd>>24; | |||
77 | unsigned char b=fd>>16, c=fd>>8, d=fd; | |||
78 | struct aio_queue *q = 0; | |||
79 | pthread_rwlock_rdlock(&maplock); | |||
80 | if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][d])) && need) { | |||
81 | pthread_rwlock_unlock(&maplock); | |||
82 | pthread_rwlock_wrlock(&maplock); | |||
83 | if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24); | |||
84 | if (!map) goto out; | |||
85 | if (!map[a]) map[a] = calloc(sizeof **map, 256); | |||
86 | if (!map[a]) goto out; | |||
87 | if (!map[a][b]) map[a][b] = calloc(sizeof ***map, 256); | |||
88 | if (!map[a][b]) goto out; | |||
89 | if (!map[a][b][c]) map[a][b][c] = calloc(sizeof ****map, 256); | |||
90 | if (!map[a][b][c]) goto out; | |||
91 | if (!(q = map[a][b][c][d])) { | |||
92 | map[a][b][c][d] = q = calloc(sizeof *****map, 1); | |||
93 | if (q) { | |||
94 | q->fd = fd; | |||
95 | pthread_mutex_init(&q->lock, 0); | |||
96 | pthread_cond_init(&q->cond, 0); | |||
97 | a_inc(&aio_fd_cnt); | |||
98 | } | |||
99 | } | |||
100 | } | |||
101 | if (q) pthread_mutex_lock(&q->lock); | |||
102 | out: | |||
103 | pthread_rwlock_unlock(&maplock); | |||
104 | return q; | |||
105 | } | |||
106 | ||||
107 | static void __aio_unref_queue(struct aio_queue *q) | |||
108 | { | |||
109 | if (q->ref > 1) { | |||
110 | q->ref--; | |||
111 | pthread_mutex_unlock(&q->lock); | |||
112 | return; | |||
113 | } | |||
114 | ||||
115 | /* This is potentially the last reference, but a new reference | |||
116 | * may arrive since we cannot free the queue object without first | |||
117 | * taking the maplock, which requires releasing the queue lock. */ | |||
118 | pthread_mutex_unlock(&q->lock); | |||
119 | pthread_rwlock_wrlock(&maplock); | |||
120 | pthread_mutex_lock(&q->lock); | |||
121 | if (q->ref == 1) { | |||
122 | int fd=q->fd; | |||
123 | int a=fd>>24; | |||
124 | unsigned char b=fd>>16, c=fd>>8, d=fd; | |||
125 | map[a][b][c][d] = 0; | |||
126 | a_dec(&aio_fd_cnt); | |||
127 | pthread_rwlock_unlock(&maplock); | |||
128 | pthread_mutex_unlock(&q->lock); | |||
129 | free(q); | |||
130 | } else { | |||
131 | q->ref--; | |||
132 | pthread_rwlock_unlock(&maplock); | |||
133 | pthread_mutex_unlock(&q->lock); | |||
134 | } | |||
135 | } | |||
136 | ||||
137 | static void cleanup(void *ctx) | |||
138 | { | |||
139 | struct aio_thread *at = ctx; | |||
140 | struct aio_queue *q = at->q; | |||
141 | struct aiocb *cb = at->cb; | |||
142 | struct sigevent sev = cb->aio_sigevent; | |||
143 | ||||
144 | /* There are four potential types of waiters we could need to wake: | |||
145 | * 1. Callers of aio_cancel/close. | |||
146 | * 2. Callers of aio_suspend with a single aiocb. | |||
147 | * 3. Callers of aio_suspend with a list. | |||
148 | * 4. AIO worker threads waiting for sequenced operations. | |||
149 | * Types 1-3 are notified via atomics/futexes, mainly for AS-safety | |||
150 | * considerations. Type 4 is notified later via a cond var. */ | |||
151 | ||||
152 | cb->__ret = at->ret; | |||
153 | if (a_swap(&at->running, 0) < 0) | |||
154 | __wake(&at->running, -1, 1); | |||
155 | if (a_swap(&cb->__err, at->err) != EINPROGRESS115) | |||
156 | __wake(&cb->__err, -1, 1); | |||
157 | if (a_swap(&__aio_fut, 0)) | |||
158 | __wake(&__aio_fut, -1, 1); | |||
159 | ||||
160 | pthread_mutex_lock(&q->lock); | |||
161 | ||||
162 | if (at->next) at->next->prev = at->prev; | |||
163 | if (at->prev) at->prev->next = at->next; | |||
164 | else q->head = at->next; | |||
165 | ||||
166 | /* Signal aio worker threads waiting for sequenced operations. */ | |||
167 | pthread_cond_broadcast(&q->cond); | |||
168 | ||||
169 | __aio_unref_queue(q); | |||
170 | ||||
171 | if (sev.sigev_notify == SIGEV_SIGNAL0) { | |||
172 | siginfo_t si = { | |||
173 | .si_signo = sev.sigev_signo, | |||
174 | .si_value__si_fields.__si_common.__second.si_value = sev.sigev_value, | |||
175 | .si_code = SI_ASYNCIO(-4), | |||
176 | .si_pid__si_fields.__si_common.__first.__piduid.si_pid = getpid(), | |||
177 | .si_uid__si_fields.__si_common.__first.__piduid.si_uid = getuid() | |||
178 | }; | |||
179 | __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si)__syscall3(129,((long) (si.__si_fields.__si_common.__first.__piduid .si_pid)),((long) (si.si_signo)),((long) (&si))); | |||
180 | } | |||
181 | if (sev.sigev_notify == SIGEV_THREAD2) { | |||
182 | a_store(&__pthread_self()->cancel, 0); | |||
183 | sev.sigev_notify_function(sev.sigev_value); | |||
184 | } | |||
185 | } | |||
186 | ||||
187 | static void *io_thread_func(void *ctx) | |||
188 | { | |||
189 | struct aio_thread at, *p; | |||
190 | ||||
191 | struct aio_args *args = ctx; | |||
192 | struct aiocb *cb = args->cb; | |||
193 | int fd = cb->aio_fildes; | |||
194 | int op = args->op; | |||
195 | void *buf = (void *)cb->aio_buf; | |||
196 | size_t len = cb->aio_nbytes; | |||
197 | off_t off = cb->aio_offset; | |||
198 | ||||
199 | struct aio_queue *q = __aio_get_queue(fd, 1); | |||
200 | ssize_t ret; | |||
| ||||
201 | ||||
202 | args->err = q ? 0 : EAGAIN11; | |||
203 | sem_post(&args->sem); | |||
204 | if (!q) return 0; | |||
205 | ||||
206 | at.op = op; | |||
207 | at.running = 1; | |||
208 | at.ret = -1; | |||
209 | at.err = ECANCELED125; | |||
210 | at.q = q; | |||
211 | at.td = __pthread_self(); | |||
212 | at.cb = cb; | |||
213 | at.prev = 0; | |||
214 | if ((at.next = q->head)) at.next->prev = &at; | |||
215 | q->head = &at; | |||
216 | q->ref++; | |||
217 | ||||
218 | if (!q->init) { | |||
219 | int seekable = lseek(fd, 0, SEEK_CUR1) >= 0; | |||
220 | q->seekable = seekable; | |||
221 | q->append = !seekable || (fcntl(fd, F_GETFL3) & O_APPEND02000); | |||
222 | q->init = 1; | |||
223 | } | |||
224 | ||||
225 | pthread_cleanup_push(cleanup, &at)do { struct __ptcb __cb; _pthread_cleanup_push(&__cb, cleanup , &at);; | |||
226 | ||||
227 | /* Wait for sequenced operations. */ | |||
228 | if (op!=LIO_READ0 && (op!=LIO_WRITE1 || q->append)) { | |||
229 | for (;;) { | |||
230 | for (p=at.next; p && p->op!=LIO_WRITE1; p=p->next); | |||
231 | if (!p) break; | |||
232 | pthread_cond_wait(&q->cond, &q->lock); | |||
233 | } | |||
234 | } | |||
235 | ||||
236 | pthread_mutex_unlock(&q->lock); | |||
237 | ||||
238 | switch (op) { | |||
239 | case LIO_WRITE1: | |||
240 | ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off); | |||
241 | break; | |||
242 | case LIO_READ0: | |||
243 | ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, off); | |||
244 | break; | |||
245 | case O_SYNC04010000: | |||
246 | ret = fsync(fd); | |||
247 | break; | |||
248 | case O_DSYNC010000: | |||
249 | ret = fdatasync(fd); | |||
250 | break; | |||
251 | } | |||
252 | at.ret = ret; | |||
| ||||
253 | at.err = ret<0 ? errno(*__errno_location()) : 0; | |||
254 | ||||
255 | pthread_cleanup_pop(1)_pthread_cleanup_pop(&__cb, (1)); } while(0); | |||
256 | ||||
257 | return 0; | |||
258 | } | |||
259 | ||||
260 | static int submit(struct aiocb *cb, int op) | |||
261 | { | |||
262 | int ret = 0; | |||
263 | pthread_attr_t a; | |||
264 | sigset_t allmask, origmask; | |||
265 | pthread_t td; | |||
266 | struct aio_args args = { .cb = cb, .op = op }; | |||
267 | sem_init(&args.sem, 0, 0); | |||
268 | ||||
269 | if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD2) { | |||
270 | if (cb->aio_sigevent.sigev_notify_attributes) | |||
271 | a = *cb->aio_sigevent.sigev_notify_attributes; | |||
272 | else | |||
273 | pthread_attr_init(&a); | |||
274 | } else { | |||
275 | pthread_attr_init(&a); | |||
276 | pthread_attr_setstacksize(&a, PTHREAD_STACK_MIN2048); | |||
277 | pthread_attr_setguardsize(&a, 0); | |||
278 | } | |||
279 | pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED1); | |||
280 | sigfillset(&allmask); | |||
281 | pthread_sigmask(SIG_BLOCK0, &allmask, &origmask); | |||
282 | cb->__err = EINPROGRESS115; | |||
283 | if (pthread_create(&td, &a, io_thread_func, &args)) { | |||
284 | errno(*__errno_location()) = EAGAIN11; | |||
285 | ret = -1; | |||
286 | } | |||
287 | pthread_sigmask(SIG_SETMASK2, &origmask, 0); | |||
288 | ||||
289 | if (!ret) { | |||
290 | while (sem_wait(&args.sem)); | |||
291 | if (args.err) { | |||
292 | errno(*__errno_location()) = args.err; | |||
293 | ret = -1; | |||
294 | } | |||
295 | } | |||
296 | ||||
297 | return ret; | |||
298 | } | |||
299 | ||||
300 | int aio_read(struct aiocb *cb) | |||
301 | { | |||
302 | return submit(cb, LIO_READ0); | |||
303 | } | |||
304 | ||||
305 | int aio_write(struct aiocb *cb) | |||
306 | { | |||
307 | return submit(cb, LIO_WRITE1); | |||
308 | } | |||
309 | ||||
310 | int aio_fsync(int op, struct aiocb *cb) | |||
311 | { | |||
312 | if (op != O_SYNC04010000 && op != O_DSYNC010000) { | |||
313 | errno(*__errno_location()) = EINVAL22; | |||
314 | return -1; | |||
315 | } | |||
316 | return submit(cb, op); | |||
317 | } | |||
318 | ||||
319 | ssize_t aio_return(struct aiocb *cb) | |||
320 | { | |||
321 | return cb->__ret; | |||
322 | } | |||
323 | ||||
324 | int aio_error(const struct aiocb *cb) | |||
325 | { | |||
326 | a_barrier(); | |||
327 | return cb->__err & 0x7fffffff; | |||
328 | } | |||
329 | ||||
330 | int aio_cancel(int fd, struct aiocb *cb) | |||
331 | { | |||
332 | sigset_t allmask, origmask; | |||
333 | int ret = AIO_ALLDONE2; | |||
334 | struct aio_thread *p; | |||
335 | struct aio_queue *q; | |||
336 | ||||
337 | /* Unspecified behavior case. Report an error. */ | |||
338 | if (cb && fd != cb->aio_fildes) { | |||
339 | errno(*__errno_location()) = EINVAL22; | |||
340 | return -1; | |||
341 | } | |||
342 | ||||
343 | sigfillset(&allmask); | |||
344 | pthread_sigmask(SIG_BLOCK0, &allmask, &origmask); | |||
345 | ||||
346 | if (!(q = __aio_get_queue(fd, 0))) { | |||
347 | if (fcntl(fd, F_GETFD1) < 0) ret = -1; | |||
348 | goto done; | |||
349 | } | |||
350 | ||||
351 | for (p = q->head; p; p = p->next) { | |||
352 | if (cb && cb != p->cb) continue; | |||
353 | /* Transition target from running to running-with-waiters */ | |||
354 | if (a_cas(&p->running, 1, -1)) { | |||
355 | pthread_cancel(p->td); | |||
356 | __wait(&p->running, 0, -1, 1); | |||
357 | if (p->err == ECANCELED125) ret = AIO_CANCELED0; | |||
358 | } | |||
359 | } | |||
360 | ||||
361 | pthread_mutex_unlock(&q->lock); | |||
362 | done: | |||
363 | pthread_sigmask(SIG_SETMASK2, &origmask, 0); | |||
364 | return ret; | |||
365 | } | |||
366 | ||||
367 | int __aio_close(int fd) | |||
368 | { | |||
369 | a_barrier(); | |||
370 | if (aio_fd_cnt) aio_cancel(fd, 0); | |||
371 | return fd; | |||
372 | } | |||
373 | ||||
374 | LFS64(aio_cancel)extern __typeof(aio_cancel) aio_cancel64 __attribute__((weak, alias("aio_cancel"))); | |||
375 | LFS64(aio_error)extern __typeof(aio_error) aio_error64 __attribute__((weak, alias ("aio_error"))); | |||
376 | LFS64(aio_fsync)extern __typeof(aio_fsync) aio_fsync64 __attribute__((weak, alias ("aio_fsync"))); | |||
377 | LFS64(aio_read)extern __typeof(aio_read) aio_read64 __attribute__((weak, alias ("aio_read"))); | |||
378 | LFS64(aio_write)extern __typeof(aio_write) aio_write64 __attribute__((weak, alias ("aio_write"))); | |||
379 | LFS64(aio_return)extern __typeof(aio_return) aio_return64 __attribute__((weak, alias("aio_return"))); |