/* * Async syscalls * Copyright (C) Volker Lendecke 2012 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "asys.h" #include #include #include "../pthreadpool/pthreadpool.h" struct asys_pwrite_args { int fildes; const void *buf; size_t nbyte; off_t offset; }; struct asys_pread_args { int fildes; void *buf; size_t nbyte; off_t offset; }; struct asys_fsync_args { int fildes; }; union asys_job_args { struct asys_pwrite_args pwrite_args; struct asys_pread_args pread_args; struct asys_fsync_args fsync_args; }; struct asys_job { void *private_data; union asys_job_args args; ssize_t ret; int err; char busy; char canceled; }; struct asys_context { struct pthreadpool *pool; int pthreadpool_fd; unsigned num_jobs; struct asys_job **jobs; }; struct asys_creds_context { int dummy; }; int asys_context_init(struct asys_context **pctx, unsigned max_parallel) { struct asys_context *ctx; int ret; ctx = calloc(1, sizeof(struct asys_context)); if (ctx == NULL) { return ENOMEM; } ret = pthreadpool_init(max_parallel, &ctx->pool); if (ret != 0) { free(ctx); return ret; } ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool); *pctx = ctx; return 0; } int asys_signalfd(struct asys_context *ctx) { return ctx->pthreadpool_fd; } int asys_context_destroy(struct asys_context *ctx) { int ret; unsigned i; for (i=0; inum_jobs; i++) { if (ctx->jobs[i]->busy) { return EBUSY; } } ret = pthreadpool_destroy(ctx->pool); if (ret != 0) { return ret; } for (i=0; inum_jobs; i++) { free(ctx->jobs[i]); } free(ctx->jobs); free(ctx); return 0; } static int asys_new_job(struct asys_context *ctx, int *jobid, struct asys_job **pjob) { struct asys_job **tmp; struct asys_job *job; unsigned i; for (i=0; inum_jobs; i++) { job = ctx->jobs[i]; if (!job->busy) { job->err = 0; *pjob = job; *jobid = i; return 0; } } if (ctx->num_jobs+1 == 0) { return EBUSY; /* overflow */ } tmp = realloc(ctx->jobs, sizeof(struct asys_job *)*(ctx->num_jobs+1)); if (tmp == NULL) { return ENOMEM; } ctx->jobs = tmp; job = calloc(1, sizeof(struct asys_job)); if (job == NULL) { return ENOMEM; } ctx->jobs[ctx->num_jobs] = job; *jobid = ctx->num_jobs; *pjob = job; ctx->num_jobs += 1; return 0; } static void asys_pwrite_do(void *private_data); int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf, size_t nbyte, off_t offset, void *private_data) { struct asys_job *job; struct asys_pwrite_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.pwrite_args; args->fildes = fildes; args->buf = buf; args->nbyte = nbyte; args->offset = offset; ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_pwrite_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_pwrite_args *args = &job->args.pwrite_args; job->ret = pwrite(args->fildes, args->buf, args->nbyte, args->offset); if (job->ret == -1) { job->err = errno; } } static void asys_pread_do(void *private_data); int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte, off_t offset, void *private_data) { struct asys_job *job; struct asys_pread_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.pread_args; args->fildes = fildes; args->buf = buf; args->nbyte = nbyte; args->offset = offset; ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_pread_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_pread_args *args = &job->args.pread_args; job->ret = pread(args->fildes, args->buf, args->nbyte, args->offset); if (job->ret == -1) { job->err = errno; } } static void asys_fsync_do(void *private_data); int asys_fsync(struct asys_context *ctx, int fildes, void *private_data) { struct asys_job *job; struct asys_fsync_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.fsync_args; args->fildes = fildes; ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_fsync_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_fsync_args *args = &job->args.fsync_args; job->ret = fsync(args->fildes); if (job->ret == -1) { job->err = errno; } } void asys_cancel(struct asys_context *ctx, void *private_data) { unsigned i; for (i=0; inum_jobs; i++) { struct asys_job *job = ctx->jobs[i]; if (job->private_data == private_data) { job->canceled = 1; } } } int asys_results(struct asys_context *ctx, struct asys_result *results, unsigned num_results) { int jobids[num_results]; int i, ret; ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results); if (ret <= 0) { return ret; } for (i=0; i= ctx->num_jobs)) { return -EIO; } job = ctx->jobs[jobid]; if (job->canceled) { result->ret = -1; result->err = ECANCELED; } else { result->ret = job->ret; result->err = job->err; } result->private_data = job->private_data; job->busy = 0; } return ret; }