#include #include #include #include #include #include #include #include #include "storage.h" #include "intf.h" #include "debug.h" static int unmap_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk); static int write_header(int fd, struct tsnif_storage_opts *opts) { int err; struct tsnif_storage_header_mmap header = { .common = { .magic = TSNIF_HEADER_MAGIC, .version = TSNIF_STORAGE_VERSION, .storage_type = TSNIF_STORAGE_TYPE_MMAP, .term_type = opts->term_type, }, .size_max = opts->size_max, }; err = write(fd, &header, sizeof(header)); if (err < 0) return err; return ftruncate(fd, sysconf(_SC_PAGESIZE)); } static int map_header(struct tsnif_storage_handle *h) { void *ptr; int flags = PROT_READ; if (h->opts->flags & TSNIF_STORAGE_OPT_WRITE) flags = PROT_WRITE; ptr = mmap(NULL, sysconf(_SC_PAGESIZE), flags, MAP_SHARED, h->fd, 0); if (MAP_FAILED == ptr) return -1; h->header = ptr; return 0; } static int check_header(struct tsnif_storage_handle *h) { struct tsnif_storage_header_mmap *header = h->header; struct tsnif_storage_header *common = &header->common; if (common->magic != TSNIF_HEADER_MAGIC) { TSNIF_DEBUG(STORAGE, "failed: wrong header magic %x\n", common->magic); return -1; } if (common->version != TSNIF_STORAGE_VERSION) { TSNIF_DEBUG(STORAGE, "failed: wrong storage version %x\n", common->version); return -1; } if (common->storage_type != TSNIF_STORAGE_TYPE_MMAP) { TSNIF_DEBUG(STORAGE, "failed: wrong storage type %x\n", common->storage_type); return -1; } TSNIF_DEBUG(STORAGE, "got terminal type %d\n", common->term_type); return 0; } int tsnif_storage_init(struct tsnif_storage_handle *h, struct tsnif_storage_opts *opts, char *name) { int fd, create, err; int flags = O_RDONLY; int mode = S_IRUSR | S_IWUSR; if (!opts) return -1; memset(h, 0, sizeof(*h)); h->opts = opts; /* mmap page sized chunks as default */ if (!opts->chunk_size) opts->chunk_size = sysconf(_SC_PAGESIZE); create = (opts->flags & TSNIF_STORAGE_OPT_WRITE); if (create) flags |= O_CREAT | O_TRUNC | O_RDWR; fd = open(name, flags, mode); if (fd < 0) { TSNIF_DEBUG(STORAGE, "open failed for '%s': %s\n", name, strerror(errno)); return -1; } TSNIF_DEBUG(STORAGE, "opened '%s'\n", name); if (create && (err = write_header(fd, opts))) { TSNIF_DEBUG(STORAGE, "write failed for '%s': %s\n", name, strerror(errno)); close(fd); return err; } h->fd = fd; err = map_header(h); if (err) { close(fd); return err; } TSNIF_DEBUG(STORAGE, "records count %d\n", h->header->cnt); if (opts->flags & TSNIF_STORAGE_OPT_READ) { struct stat st; err = fstat(fd, &st); if (err) return err; h->file_size = st.st_size; return check_header(h); } return 0; } int tsnif_storage_close(struct tsnif_storage_handle *h) { if (h->header) { TSNIF_DEBUG(STORAGE, "records count %d\n", h->header->cnt); munmap(h->header, sysconf(_SC_PAGESIZE)); } unmap_chunk(h, &h->active_chunk); close(h->fd); return 0; } static int get_open_offset(struct tsnif_storage_handle *h) { return h->header->offset_start; } static int unmap_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk) { if (!chunk->header) return -1; return munmap(chunk->header, h->opts->chunk_size); } enum { STORAGE_MMAP_FIRST, STORAGE_MMAP_NEXT, STORAGE_MMAP_PREV, }; static int file_trunc(struct tsnif_storage_handle *h, off_t new_size) { off_t old_size; TSNIF_DEBUG(STORAGE, "new size %d\n", new_size); old_size = lseek(h->fd, 0, SEEK_END); if (old_size < 0) return -1; TSNIF_DEBUG(STORAGE, "old size %d\n", old_size); if (old_size >= new_size) return 0; TSNIF_DEBUG(STORAGE, "truncating to %d\n", new_size); return ftruncate(h->fd, new_size); } static int map_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk, int what) { void *ptr; off_t offset = 0; int err, flags = PROT_READ; TSNIF_DEBUG(STORAGE, "what %d\n", what); if (what == STORAGE_MMAP_FIRST) { if (h->opts->flags & TSNIF_STORAGE_OPT_READ) offset = get_open_offset(h); else { offset = sysconf(_SC_PAGESIZE); h->header->offset_start = offset; } } if (what == STORAGE_MMAP_NEXT) { offset = chunk->offset + h->opts->chunk_size; if (h->opts->flags & TSNIF_STORAGE_OPT_READ) { TSNIF_DEBUG(STORAGE, "file size 0x%x\n", h->file_size); TSNIF_DEBUG(STORAGE, "offset 0x%x\n", offset); if (h->file_size == offset) offset = sysconf(_SC_PAGESIZE); TSNIF_DEBUG(STORAGE, "offset 0x%x\n", offset); TSNIF_DEBUG(STORAGE, "start offset 0x%x\n", h->header->offset_start); if (h->header->offset_start == offset) return TSNIF_STORAGE_READ_EOF; } else { /* check the max size only in write mode */ if (offset > h->opts->size_max) offset = sysconf(_SC_PAGESIZE); } TSNIF_DEBUG(STORAGE, "chunk offset 0x%x\n", chunk->offset); TSNIF_DEBUG(STORAGE, "chunk size 0x%x\n", chunk->offset); TSNIF_DEBUG(STORAGE, "max size 0x%x\n", h->opts->size_max); } if (what == STORAGE_MMAP_PREV) { if (h->header->offset_start == chunk->offset) return TSNIF_STORAGE_READ_EOF; if (chunk->offset == sysconf(_SC_PAGESIZE)) offset = h->file_size - h->opts->chunk_size; else offset = chunk->offset - h->opts->chunk_size; TSNIF_DEBUG(STORAGE, "chunk offset 0x%x\n", chunk->offset); TSNIF_DEBUG(STORAGE, "chunk size 0x%x\n", chunk->offset); TSNIF_DEBUG(STORAGE, "max size 0x%x\n", h->opts->size_max); } TSNIF_DEBUG(STORAGE, "new offset 0x%x, old offset 0x%x\n", offset, chunk->offset); if (chunk->header) unmap_chunk(h, chunk); if (h->opts->flags & TSNIF_STORAGE_OPT_WRITE) { err = file_trunc(h, offset + h->opts->chunk_size); if (err) return err; flags = PROT_WRITE; } ptr = mmap(NULL, h->opts->chunk_size, flags, MAP_SHARED, h->fd, offset); if (MAP_FAILED == ptr) return -1; TSNIF_DEBUG(STORAGE, "mmap-ed offset %ld, size %d\n", offset, h->opts->chunk_size); chunk->offset = offset; chunk->header = ptr; chunk->current_data = chunk->header->rec0; chunk->current_index = (void*) (chunk->header) + h->opts->chunk_size - sizeof(uint32_t); return 0; } static int clear_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk) { struct tsnif_storage_chunk_header *header = chunk->header; memset(header, 0x0, sizeof(*header)); header->free = h->opts->chunk_size - sizeof(struct tsnif_storage_chunk_header); chunk->current_data = header->rec0; chunk->current_index = (void*) (header) + h->opts->chunk_size - sizeof(uint32_t); TSNIF_DEBUG(STORAGE, "chunk header %p\n", chunk->header); TSNIF_DEBUG(STORAGE, "chunk data %p\n", chunk->current_data); TSNIF_DEBUG(STORAGE, "chunk index %p\n", chunk->current_index); TSNIF_DEBUG(STORAGE, "chunk size 0x%x\n", h->opts->chunk_size); TSNIF_DEBUG(STORAGE, "chunk free %d\n", header->free); return 0; } static int map_write_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk, int what) { return (map_chunk(h, chunk, what) || clear_chunk(h, chunk)); } static int map_read_chunk(struct tsnif_storage_handle *h, struct tsnif_storage_chunk *chunk, int what) { int err; err = map_chunk(h, chunk, what); if ((err < 0 ) || (err == TSNIF_STORAGE_READ_EOF)) return err; chunk->read_index = -1; return 0; } static int get_chunk_write_rec(struct tsnif_storage_handle *h, int size) { struct tsnif_storage_chunk *chunk = &h->active_chunk; struct tsnif_storage_chunk_header *chunk_header = chunk->header; TSNIF_DEBUG(STORAGE, "chunk header %p\n", chunk_header); /* first time */ if (!chunk_header) return map_write_chunk(h, chunk, STORAGE_MMAP_FIRST); /* we fit into current chunk */ #define INDEX_OVERHEAD (4*sizeof(uint32_t)) if (chunk_header->free > (size + sizeof(struct tsnif_storage_rec_mmap) + INDEX_OVERHEAD)) return 0; /* we dont fit into current chunk, * let's go for another */ return map_write_chunk(h, chunk, STORAGE_MMAP_NEXT); } static int get_chunk_read_rec(struct tsnif_storage_handle *h, int what) { struct tsnif_storage_chunk *chunk = &h->active_chunk; struct tsnif_storage_chunk_header *chunk_header = chunk->header; int map_what = 0; TSNIF_DEBUG(STORAGE, "chunk header %p\n", chunk_header); /* first time */ if (!chunk_header) return map_read_chunk(h, chunk, STORAGE_MMAP_FIRST); TSNIF_DEBUG(STORAGE, "chunk cnt 0x%x\n", chunk_header->cnt); TSNIF_DEBUG(STORAGE, "read index 0x%x\n", chunk->read_index); /* we want next record and we have it */ if (TSNIF_STORAGE_READ_NEXT == what) { if ((chunk->read_index + 1) < chunk_header->cnt) return 0; map_what = STORAGE_MMAP_NEXT; } /* we want previous record and we have it */ if (TSNIF_STORAGE_READ_PREV == what) { if (chunk->read_index) return 0; map_what = STORAGE_MMAP_PREV; } return map_read_chunk(h, chunk, map_what); } static int store_rec(struct tsnif_storage_handle *h, struct tsnif_storage_rec *rec) { struct tsnif_storage_rec_mmap *mrec; struct tsnif_storage_chunk *chunk = &h->active_chunk; struct tsnif_storage_chunk_header *header = chunk->header; TSNIF_DEBUG(STORAGE, "chunk data %p\n", chunk->current_data); TSNIF_DEBUG(STORAGE, "chunk index %p\n", chunk->current_index); TSNIF_DEBUG(STORAGE, "time %ld\n", rec->time.tv_sec); mrec = chunk->current_data; mrec->len = rec->len; mrec->flags = rec->flags; mrec->time = rec->time; mrec->ws = rec->ws; memcpy(mrec->data, rec->ptr, rec->len); #define RECLEN (sizeof(struct tsnif_storage_rec_mmap) + rec->len) *(chunk->current_index) = (void*) chunk->current_data - (void*) header; TSNIF_DEBUG(STORAGE, "rec offset 0x%x\n", *(chunk->current_index)); TSNIF_DEBUG(STORAGE, "rec size 0x%x\n", rec->len); chunk->current_index--; chunk->current_data += RECLEN; /* update file header */ h->header->cnt++; /* update chunk header */ header->cnt++; header->free -= (sizeof(uint32_t) + RECLEN); TSNIF_DEBUG(STORAGE, "chunk data %p\n", chunk->current_data); TSNIF_DEBUG(STORAGE, "chunk index %p\n", chunk->current_index); TSNIF_DEBUG(STORAGE, "chunk free %d\n", header->free); TSNIF_DEBUG(STORAGE, "chunk cnt %d\n", header->cnt); return 0; } static int read_rec(struct tsnif_storage_handle *h, struct tsnif_storage_rec *rec, int what) { struct tsnif_storage_rec_mmap *mrec; struct tsnif_storage_chunk *chunk = &h->active_chunk; struct tsnif_storage_chunk_header *header = chunk->header; off_t offset; TSNIF_DEBUG(STORAGE, "chunk data %p\n", chunk->current_data); TSNIF_DEBUG(STORAGE, "chunk index %p\n", chunk->current_index); TSNIF_DEBUG(STORAGE, "read index 0x%x\n", chunk->read_index); /* the chunk got loaded just now */ if (chunk->read_index == -1) { if (what == TSNIF_STORAGE_READ_NEXT) chunk->read_index = 0; else chunk->read_index = chunk->header->cnt - 1; /* move within the chunk */ } else { if (what == TSNIF_STORAGE_READ_NEXT) chunk->read_index++; else chunk->read_index--; } offset = *(chunk->current_index - chunk->read_index); mrec = (void*) header + offset; rec->ptr = mrec->data; rec->len = mrec->len; rec->flags = mrec->flags; rec->time = mrec->time; rec->ws = mrec->ws; return 0; } int tsnif_storage_write(struct tsnif_storage_handle *h, struct tsnif_storage_rec *rec) { int err; TSNIF_DEBUG(STORAGE, "entry\n"); err = get_chunk_write_rec(h, rec->len); if (err) return err; return store_rec(h, rec); } int tsnif_storage_read(struct tsnif_storage_handle *h, int what, struct tsnif_storage_rec *rec) { int err; TSNIF_DEBUG(STORAGE, "entry\n"); err = get_chunk_read_rec(h, what); if (err) return err; return read_rec(h, rec, what); } int tsnif_storage_term_type(struct tsnif_storage_handle *h) { return h->header->common.term_type; }