--- /dev/null
+/*
+ * Copyright (c) 2012-2013 File systems and Storage Lab (FSL)
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published by
+ * the Free Software Foundation.
+ *
+ */
+
+#include <linux/errno.h>
+
+#include "dm-bufio.h"
+#include "dm-dedup-dtb.h"
+#include "dm-dedup-backend.h"
+#include "dm-dedup-kvstore.h"
+
+#define EMPTY_ENTRY -5
+#define DELETED_ENTRY -6
+
+#define UINT32_MAX (4294967295U)
+#define METADATA_BLOCK_SIZE 4096
+#define METADATA_MAX_HELD_PER_THREAD 5
+#define METADATA_SUPERBLOCK_START 0
+/* The Answer to the Ultimate Question of Life, The Universe, and Everything */
+#define METADATA_SUPERBLOCK_MAGIC 42424242
+
+struct metadata_superblock {
+ __le64 blocknr; /* This block number, dm_block_t. */
+ __le64 magic; /* Magic number to check against */
+ __le64 data_space_map_start; /* Data space map start block */
+ __le64 data_space_map_end; /* Data space map end block */
+ __le64 data_space_map_smax; /* Maximum number of entries */
+ __le64 data_space_map_allocptr; /* Data space map last allocptr */
+ __le32 data_space_map_vsize; /* Data space map value size */
+ __le64 lbn_pcn_start; /* lbn pcn table start block. */
+ __le64 lbn_pcn_end; /* lbn pcn table end block. */
+ __le32 lbn_pcn_bitmap_size; /* Size of space bitmap per block */
+ __le32 lbn_pcn_num_entries; /* Number of entries stored per block */
+ __le32 lbn_pcn_ksize; /* Key size */
+ __le32 lbn_pcn_vsize; /* Value size */
+ __le32 lbn_pcn_kmax; /* Maximum number of keys */
+ __le64 hash_pcn_start; /* hash pcn table start block. */
+ __le64 hash_pcn_end; /* hash pcn table end block. */
+ __le32 hash_pcn_bitmap_size; /* Size of space bitmap per block */
+ __le32 hash_pcn_num_entries; /* Number of entries stored per block */
+ __le32 hash_pcn_ksize; /* Key size */
+ __le32 hash_pcn_vsize; /* Value size */
+ __le32 hash_pcn_kmax; /* Maximum number of keys */
+ __le64 metadata_block_size; /* In bytes */
+ __le64 metadata_nr_blocks;/* Number of metadata blocks used. */
+} __packed;
+
+struct metadata {
+ struct block_device *metadata_dev;
+ struct dm_bufio_client *client;
+ uint64_t currently_allocated_on_disk_offset;
+
+ /* Superblock information */
+ struct metadata_superblock *disk_super;
+ void *disk_super_data;
+ int create_new;
+
+ /* Space Map */
+ uint64_t smax;
+ uint64_t allocptr;
+ uint64_t smap_start_block;
+ uint64_t smap_end_block;
+
+ /*
+ * XXX: Currently we support only one linear and one sparse KVS.
+ */
+ struct kvstore_disktable *kvs_linear;
+ struct kvstore_disktable *kvs_sparse;
+
+ /* Private data */
+ void *private;
+ uint32_t priv_size;
+};
+
+struct kvstore_disktable {
+ struct kvstore ckvs;
+ uint32_t kmax;
+ sector_t start_block;
+ sector_t end_block;
+ struct metadata *md;
+ uint32_t num_entries_per_block;
+ uint32_t bitmap_size;
+};
+
+static void disktable_metadata_alloc_callback(struct dm_buffer *buf)
+{
+ return;
+}
+
+static void disktable_metadata_write_callback(struct dm_buffer *buf)
+{
+ return;
+}
+
+static struct metadata *init_meta_disktable(void *p, bool *unformatted)
+{
+ int create_new = 1;
+ uint64_t smap_size, start, end, i, smax, allocptr;
+ struct metadata *md;
+ struct dm_bufio_client *client;
+ struct dm_buffer *buf;
+ struct init_param_disktable *param = (struct init_param_disktable *)p;
+ void *ptr, *ret, *disk_super_data = NULL;
+ struct metadata_superblock *disk_super = NULL;
+
+ md = kmalloc(sizeof(*md), GFP_KERNEL);
+ if (!md)
+ return ERR_PTR(-ENOMEM);
+
+ client = dm_bufio_client_create(param->metadata_dev,
+ METADATA_BLOCK_SIZE, METADATA_MAX_HELD_PER_THREAD, 0,
+ disktable_metadata_alloc_callback,
+ disktable_metadata_write_callback);
+
+ if (IS_ERR_OR_NULL(client)) {
+ ret = (struct metadata *)client;
+ goto out_md;
+ }
+
+ disk_super = kmalloc(sizeof(struct metadata_superblock), GFP_KERNEL);
+ if (!disk_super) {
+ ret = ERR_PTR(-ENOMEM);
+ goto out_client;
+ }
+ memset(disk_super, 0, sizeof(struct metadata_superblock));
+
+ if (!param->reconstruct_meta)
+ create_new = 1;
+ else if (param->reconstruct_meta == 1)
+ create_new = 0;
+ else {
+ ret = ERR_PTR(-EINVAL);
+ goto out_disk_super;
+ }
+
+ if (!create_new) {
+ /* Read superblock from disk and try to reconstruct */
+ ptr = dm_bufio_read(client, METADATA_SUPERBLOCK_START, &buf);
+ if (unlikely(IS_ERR(ptr))) {
+ ret = ptr;
+ goto out_disk_super;
+ }
+
+ memcpy(disk_super, ptr, sizeof(struct metadata_superblock));
+
+ if (disk_super->magic != METADATA_SUPERBLOCK_MAGIC) {
+ pr_err("Superblock is invalid, cannot setup backend.");
+ dm_bufio_release(buf);
+ ret = ERR_PTR(-EINVAL);
+ goto out_disk_super;
+ }
+
+ disk_super_data = kmalloc(le64_to_cpu(
+ disk_super->metadata_block_size), GFP_KERNEL);
+ if (!disk_super_data) {
+ dm_bufio_release(buf);
+ ret = ERR_PTR(-ENOMEM);
+ goto out_disk_super;
+ }
+ memcpy(disk_super_data, ptr, le64_to_cpu(
+ disk_super->metadata_block_size));
+
+ dm_bufio_release(buf);
+
+ start = le64_to_cpu(disk_super->data_space_map_start);
+ end = le64_to_cpu(disk_super->data_space_map_end);
+ smax = le64_to_cpu(disk_super->data_space_map_smax);
+ if (smax != param->blocks) {
+ pr_err("The number of blocks sent as parameter "
+ "don't match what is saved on disk.");
+ ret = ERR_PTR(-EINVAL);
+ goto out_disk_super_data;
+ }
+
+ allocptr = le64_to_cpu(disk_super->data_space_map_allocptr);
+ smap_size = smax *
+ le32_to_cpu(disk_super->data_space_map_vsize);
+ } else {
+ /* Need to initialize a new superblock on the disk */
+ smax = param->blocks;
+ smap_size = param->blocks * sizeof(uint32_t);
+ allocptr = 0;
+
+ start = 1;
+ end = start + (smap_size / METADATA_BLOCK_SIZE) + 1;
+
+ /* Initialize space map on disk */
+ for (i = start; i <= end; i++) {
+ ptr = dm_bufio_new(client, i, &buf);
+ if (unlikely(IS_ERR(ptr))) {
+ ret = ptr;
+ goto out_disk_super;
+ }
+
+ memset(ptr, 0, METADATA_BLOCK_SIZE);
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ }
+
+ disk_super->blocknr = cpu_to_le64(METADATA_SUPERBLOCK_START);
+ disk_super->magic = cpu_to_le64(METADATA_SUPERBLOCK_MAGIC);
+ disk_super->data_space_map_start = cpu_to_le64(start);
+ disk_super->data_space_map_end = cpu_to_le64(end);
+ disk_super->data_space_map_allocptr = cpu_to_le64(allocptr);
+ disk_super->data_space_map_vsize =
+ cpu_to_le32(sizeof(uint32_t));
+ disk_super->data_space_map_smax = cpu_to_le64(smax);
+ disk_super->metadata_block_size =
+ cpu_to_le64(METADATA_BLOCK_SIZE);
+ disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+ }
+
+ md->smax = smax;
+ md->allocptr = allocptr;
+ md->kvs_linear = NULL;
+ md->kvs_sparse = NULL;
+ md->metadata_dev = param->metadata_dev;
+ md->client = client;
+ md->smap_start_block = start;
+ md->smap_end_block = end;
+ md->currently_allocated_on_disk_offset =
+ disk_super->metadata_nr_blocks;
+ md->disk_super = disk_super;
+ md->create_new = create_new;
+ md->private = NULL;
+ md->priv_size = 0;
+ md->disk_super_data = disk_super_data;
+
+ DMINFO("Initializing DM_BUFIO backend");
+
+ pr_info("Space required on disk for pcn reference count map: "
+ "%llu.%06llu MB\n", smap_size / (1024 * 1024),
+ smap_size - ((smap_size /
+ (1024 * 1024)) * (1024 * 1024)));
+
+ return md;
+
+out_disk_super_data:
+ if (disk_super_data)
+ kfree(disk_super_data);
+out_disk_super:
+ kfree(disk_super);
+out_client:
+ dm_bufio_client_destroy(client);
+out_md:
+ kfree(md);
+ md = NULL;
+ return ret;
+}
+
+static void exit_meta_disktable(struct metadata *md)
+{
+ int ret = 0;
+ struct dm_buffer *buf = NULL;
+ void *p = NULL;
+
+ p = dm_bufio_new(md->client, METADATA_SUPERBLOCK_START, &buf);
+ if (!IS_ERR(p)) {
+ md->disk_super->data_space_map_allocptr =
+ cpu_to_le64(md->allocptr);
+ memset(p, 0, md->disk_super->metadata_block_size);
+ memcpy(p, md->disk_super, sizeof(struct metadata_superblock));
+
+ if (md->private)
+ memcpy(p + sizeof(struct metadata_superblock),
+ md->private, md->priv_size);
+
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ } else
+ DMWARN("Getting superblock from disk failed");
+
+ ret = dm_bufio_write_dirty_buffers(md->client);
+ if (ret)
+ DMWARN("Writing dirty buffers failed");
+
+ ret = dm_bufio_issue_flush(md->client);
+ if (ret)
+ DMWARN("Flushing buffers failed");
+
+ if (md->kvs_linear)
+ kfree(md->kvs_linear);
+
+ if (md->kvs_sparse)
+ kfree(md->kvs_sparse);
+
+ if (md->disk_super)
+ kfree(md->disk_super);
+
+ if (md->disk_super_data)
+ kfree(md->disk_super_data);
+
+ if (md->private)
+ kfree(md->private);
+
+ dm_bufio_client_destroy(md->client);
+
+ kfree(md);
+
+ return;
+}
+
+
+static int flush_meta_disktable(struct metadata *md)
+{
+ int ret = 0;
+ struct dm_buffer *buf = NULL;
+ void *p = NULL;
+
+ p = dm_bufio_new(md->client, METADATA_SUPERBLOCK_START, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ return ret;
+ }
+
+ memset(p, 0, md->disk_super->metadata_block_size);
+ memcpy(p, md->disk_super, sizeof(struct metadata_superblock));
+
+ if (md->private)
+ memcpy(p + sizeof(struct metadata_superblock),
+ md->private, md->priv_size);
+
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+
+ ret = dm_bufio_write_dirty_buffers(md->client);
+ if (ret)
+ return ret;
+
+ ret = dm_bufio_issue_flush(md->client);
+
+ return ret;
+}
+
+
+/********************************************************
+ * Private Data Functions *
+ ********************************************************/
+
+static int get_private_data_disktable(struct metadata *md, void **priv,
+ uint32_t priv_size)
+{
+ if (priv_size <= 0)
+ return -EINVAL;
+
+ if (md->private)
+ *priv = md->private;
+ else {
+ /* Try to read from saved superblock data */
+ if (md->disk_super_data) {
+ md->private = kmalloc(priv_size, GFP_KERNEL);
+ if (!md->private)
+ return -ENOMEM;
+
+ memcpy(md->private, md->disk_super_data + sizeof(
+ struct metadata_superblock), priv_size);
+
+ md->priv_size = priv_size;
+ *priv = md->private;
+ } else
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int set_private_data_disktable(struct metadata *md, void *priv,
+ uint32_t priv_size)
+{
+ if (priv_size <= 0 || !priv)
+ return -EINVAL;
+
+ if (md->private)
+ kfree(md->private);
+
+ md->private = kmalloc(priv_size, GFP_KERNEL);
+ if (!md->private)
+ return -ENOMEM;
+
+ memcpy(md->private, priv, priv_size);
+ md->priv_size = priv_size;
+
+ return 0;
+}
+
+/********************************************************
+ * Space Management Functions *
+ ********************************************************/
+
+static int alloc_data_block_disktable(struct metadata *md, uint64_t *blockn)
+{
+ uint64_t head, tail, actual_block, prev_block = 0;
+ uint32_t offset_in_block, block_size, first_run = 1;
+ struct dm_buffer *buf = NULL;
+ uint32_t value;
+ void *p = NULL;
+
+ head = tail = md->allocptr;
+ block_size = dm_bufio_get_block_size(md->client);
+
+ do {
+ actual_block = md->smap_start_block + (head *
+ sizeof(uint32_t)) / block_size;
+ offset_in_block = (head * sizeof(uint32_t)) % block_size;
+
+ /* Only read from disk when a new block is required */
+ if ((actual_block != prev_block) || first_run) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p)))
+ return PTR_ERR(p);
+
+ prev_block = actual_block;
+ first_run = 0;
+ }
+
+ value = *((uint32_t *)(p + offset_in_block));
+
+ if (!value) {
+ value = 1;
+ memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ *blockn = head;
+ md->allocptr = (head + 1) % md->smax;
+ return 0;
+ }
+
+ head = (head + 1) % md->smax;
+
+ } while (head != tail);
+
+ if (buf)
+ dm_bufio_release(buf);
+
+ return -ENOSPC;
+}
+
+static int inc_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+ uint32_t block_size, offset_in_block, value;
+ uint64_t actual_block;
+ struct dm_buffer *buf;
+ void *p;
+
+ if (blockn >= md->smax)
+ return -ERANGE;
+
+ block_size = dm_bufio_get_block_size(md->client);
+ actual_block = md->smap_start_block + (blockn *
+ sizeof(uint32_t)) / block_size;
+ offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+ p = dm_bufio_read(md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p)))
+ return PTR_ERR(p);
+
+ value = *((uint32_t *)(p + offset_in_block));
+
+ if (value != UINT32_MAX) {
+ value++;
+ memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ } else {
+ dm_bufio_release(buf);
+ return -E2BIG;
+ }
+
+ return 0;
+}
+
+static int dec_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+ uint32_t block_size, offset_in_block, value;
+ uint64_t actual_block;
+ struct dm_buffer *buf;
+ void *p;
+
+ if (blockn >= md->smax)
+ return -ERANGE;
+
+ block_size = dm_bufio_get_block_size(md->client);
+ actual_block = md->smap_start_block + (blockn *
+ sizeof(uint32_t)) / block_size;
+ offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+ p = dm_bufio_read(md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p)))
+ return PTR_ERR(p);
+
+ value = *((uint32_t *)(p + offset_in_block));
+
+ if (value) {
+ value--;
+ memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ } else {
+ dm_bufio_release(buf);
+ return -EFAULT;
+ }
+
+ return 0;
+}
+
+static int get_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+ uint32_t block_size, offset_in_block, value;
+ uint64_t actual_block;
+ struct dm_buffer *buf;
+ void *p;
+
+ if (blockn >= md->smax)
+ return -ERANGE;
+
+ block_size = dm_bufio_get_block_size(md->client);
+ actual_block = md->smap_start_block + (blockn *
+ sizeof(uint32_t)) / block_size;
+ offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+ p = dm_bufio_read(md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p)))
+ return PTR_ERR(p);
+
+ value = *((uint32_t *)(p + offset_in_block));
+
+ dm_bufio_release(buf);
+
+ return value;
+}
+
+/********************************************************
+ * General KVS Functions *
+ ********************************************************/
+
+#if 0
+static int kvs_delete_disktable(struct kvstore *kvs, void *key, int32_t ksize)
+{
+ int r;
+
+ r = 0;
+ if (kvs->kvs_delete)
+ r = kvs->kvs_delete(kvs, key, ksize);
+
+ return r;
+}
+
+static int kvs_lookup_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value, int32_t *vsize)
+{
+ int r;
+
+ r = 0;
+ if (kvs->kvs_lookup)
+ r = kvs->kvs_lookup(kvs, key, ksize, value, vsize);
+
+ return r;
+}
+
+static int kvs_insert_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value, int32_t vsize)
+{
+ int r;
+
+ r = 0;
+ if (kvs->kvs_insert)
+ r = kvs->kvs_insert(kvs, key, ksize, value, vsize);
+
+ return r;
+}
+
+/*
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ * Make sure that the store is implemented such that
+ * deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_disktable(struct kvstore *kvs, int (*itr_action)
+ (void *key, int32_t ksize, void *value, int32_t vsize,
+ void *data), void *data)
+{
+ int r;
+
+ r = 0;
+ if (kvs->kvs_iterate)
+ r = kvs->kvs_iterate(kvs, itr_action, data);
+
+ return r;
+}
+#endif
+
+/*********************************************************
+ * Linear KVS Functions *
+ *********************************************************/
+
+static int kvs_delete_linear_disktable(struct kvstore *kvs,
+ void *key, int32_t ksize)
+{
+ int ret = 0;
+ uint64_t idx, actual_block;
+ uint32_t block_size, offset_in_block, bitmap_offset;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap = NULL;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize != kvs->ksize)
+ return -EINVAL;
+
+ idx = *((uint64_t *)key);
+
+ if (idx > kvdtb->kmax)
+ return -ERANGE;
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+ bitmap_offset = idx % kvdtb->num_entries_per_block;
+ offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out_bitmap;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out_bitmap;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ if (test_bit(bitmap_offset, bitmap) == 0) {
+ ret = -ENODEV;
+ goto out_buf;
+ }
+
+ bitmap_clear(bitmap, bitmap_offset, 1);
+ memcpy(p, bitmap, kvdtb->bitmap_size);
+ dm_bufio_mark_buffer_dirty(buf);
+
+out_buf:
+ dm_bufio_release(buf);
+out_bitmap:
+ kfree(bitmap);
+ return ret;
+}
+
+/*
+ * 0 - not found
+ * 1 - found
+ * < 0 - error on lookup
+ */
+static int kvs_lookup_linear_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value, int32_t *vsize)
+{
+ int ret = 1;
+ uint64_t idx, actual_block;
+ uint32_t block_size, offset_in_block, bitmap_offset;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap = NULL;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize != kvs->ksize)
+ return -EINVAL;
+
+ idx = *((uint64_t *)key);
+
+ if (idx > kvdtb->kmax)
+ return -ERANGE;
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+ bitmap_offset = idx % kvdtb->num_entries_per_block;
+ offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out_bitmap;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out_bitmap;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ if (test_bit(bitmap_offset, bitmap) == 0) {
+ ret = 0;
+ goto out_buf;
+ }
+
+ memcpy(value, p + offset_in_block, kvs->vsize);
+ *vsize = kvs->vsize;
+
+out_buf:
+ dm_bufio_release(buf);
+out_bitmap:
+ kfree(bitmap);
+ return ret;
+}
+
+static int kvs_insert_linear_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value,
+ int32_t vsize)
+{
+ int ret = 0;
+ uint64_t idx, actual_block;
+ uint32_t block_size, offset_in_block, bitmap_offset;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap = NULL;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize != kvs->ksize)
+ return -EINVAL;
+
+ if (vsize != kvs->vsize)
+ return -EINVAL;
+
+ idx = *((uint64_t *)key);
+
+ if (idx > kvdtb->kmax)
+ return -ERANGE;
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+ bitmap_offset = idx % kvdtb->num_entries_per_block;
+ offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out_bitmap;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out_bitmap;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+
+ bitmap_set(bitmap, bitmap_offset, 1);
+ memcpy(p, bitmap, kvdtb->bitmap_size);
+ memcpy(p + offset_in_block, value, kvs->vsize);
+ dm_bufio_mark_buffer_dirty(buf);
+
+ dm_bufio_release(buf);
+out_bitmap:
+ kfree(bitmap);
+ return ret;
+}
+
+/*
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ * Make sure that the store is implemented such that
+ * deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_linear_disktable(struct kvstore *kvs,
+ int (*iteration_action)(void *key, int32_t ksize,
+ void *value, int32_t vsize, void *data), void *data)
+{
+ uint64_t i, actual_block, prev_block = 0;
+ uint32_t block_size, offset_in_block, bitmap_offset;
+ struct dm_buffer *buf = NULL;
+ void *p = NULL;
+ unsigned long *bitmap = NULL;
+ int ret = 0, first_run = 1;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ for (i = 0; i < kvdtb->kmax; i++) {
+ actual_block = kvdtb->start_block + i /
+ kvdtb->num_entries_per_block;
+ bitmap_offset = i % kvdtb->num_entries_per_block;
+ offset_in_block = bitmap_offset * kvs->vsize +
+ kvdtb->bitmap_size;
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out;
+ }
+
+ if (first_run || (actual_block != prev_block)) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ prev_block = actual_block;
+ first_run = 0;
+ }
+
+ if (test_bit(bitmap_offset, bitmap) != 0) {
+ ret = iteration_action((void *)&i, kvs->ksize,
+ p + offset_in_block, kvs->vsize,
+ data);
+ if (ret < 0)
+ goto out;
+ }
+ }
+
+out:
+ if (buf)
+ dm_bufio_release(buf);
+ kfree(bitmap);
+ return ret;
+}
+
+static struct kvstore *kvs_create_linear_disktable(struct metadata *md,
+ uint32_t ksize, uint32_t vsize, uint32_t kmax, bool unformatted)
+{
+ struct kvstore_disktable *kvs;
+ uint64_t kvstore_size, start, end, i;
+ sector_t metadata_dev_size;
+ uint32_t block_size, num_entries_per_block, bitmap_size, num_blocks;
+ void *p = NULL;
+ struct dm_buffer *buf = NULL;
+
+ if (md->create_new) {
+ if (!vsize || !ksize || !kmax)
+ return ERR_PTR(-ENOTSUPP);
+
+ /* Currently only 64bit keys are supported */
+ if (ksize != 8)
+ return ERR_PTR(-ENOTSUPP);
+ }
+
+ /* We do not support two or more KVSs at the moment */
+ if (md->kvs_linear)
+ return ERR_PTR(-EBUSY);
+
+ kvs = kmalloc(sizeof(*kvs), GFP_KERNEL);
+ if (!kvs)
+ return ERR_PTR(-ENOMEM);
+
+ block_size = dm_bufio_get_block_size(md->client);
+
+ if (!md->create_new) {
+ if (le32_to_cpu(md->disk_super->lbn_pcn_vsize) != vsize) {
+ pr_err("Value size passed does not match value size "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ if (le32_to_cpu(md->disk_super->lbn_pcn_ksize) != ksize) {
+ pr_err("Key size passed does not match key size "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ if (le32_to_cpu(md->disk_super->lbn_pcn_kmax) != kmax) {
+ pr_err("Max keys passed does not match max keys "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ start = le64_to_cpu(md->disk_super->lbn_pcn_start);
+ end = le64_to_cpu(md->disk_super->lbn_pcn_end);
+ num_entries_per_block = le32_to_cpu(
+ md->disk_super->lbn_pcn_num_entries);
+ bitmap_size = le32_to_cpu(
+ md->disk_super->lbn_pcn_bitmap_size);
+ num_blocks = (kmax / num_entries_per_block) + 1;
+ kvstore_size = num_blocks * block_size;
+ } else {
+ metadata_dev_size = dm_bufio_get_device_size(md->client);
+
+ num_entries_per_block = block_size / vsize;
+ bitmap_size = block_size - num_entries_per_block * vsize;
+ while (bitmap_size < num_entries_per_block) {
+ bitmap_size += vsize * 8;
+ num_entries_per_block--;
+ }
+
+ /* We want the size in bytes */
+ bitmap_size = bitmap_size / 8;
+
+ num_blocks = (kmax / num_entries_per_block) + 1;
+ kvstore_size = num_blocks * block_size;
+
+ if (((kvstore_size / block_size) +
+ md->currently_allocated_on_disk_offset) >
+ metadata_dev_size) {
+ pr_err("Linear kvs store cannot be created, "
+ "metadata device too small");
+ kfree(kvs);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ start = md->currently_allocated_on_disk_offset;
+ end = md->currently_allocated_on_disk_offset +
+ (kvstore_size / block_size) + 1;
+
+ for (i = start; i <= end; i++) {
+ p = dm_bufio_new(md->client, i, &buf);
+ if (unlikely(IS_ERR(p))) {
+ kfree(kvs);
+ return p;
+ }
+
+ memset(p, 0, block_size);
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ }
+
+ md->disk_super->lbn_pcn_vsize = cpu_to_le32(vsize);
+ md->disk_super->lbn_pcn_ksize = cpu_to_le32(ksize);
+ md->disk_super->lbn_pcn_kmax = cpu_to_le32(kmax);
+ md->disk_super->lbn_pcn_start = cpu_to_le64(start);
+ md->disk_super->lbn_pcn_end = cpu_to_le64(end);
+ md->disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+ md->disk_super->lbn_pcn_num_entries =
+ cpu_to_le32(num_entries_per_block);
+ md->disk_super->lbn_pcn_bitmap_size = cpu_to_le32(bitmap_size);
+ }
+
+ pr_info("Space required on disk for linear key value store: "
+ "%llu.%06llu MB\n", kvstore_size / (1024 * 1024),
+ kvstore_size - ((kvstore_size / (1024 * 1024))
+ * (1024 * 1024)));
+
+ kvs->ckvs.vsize = vsize;
+ kvs->ckvs.ksize = ksize;
+ kvs->kmax = kmax;
+ kvs->start_block = start;
+ kvs->end_block = end;
+ kvs->bitmap_size = bitmap_size;
+ kvs->num_entries_per_block = num_entries_per_block;
+ kvs->md = md;
+
+ kvs->ckvs.kvs_insert = kvs_insert_linear_disktable;
+ kvs->ckvs.kvs_lookup = kvs_lookup_linear_disktable;
+ kvs->ckvs.kvs_delete = kvs_delete_linear_disktable;
+ kvs->ckvs.kvs_iterate = kvs_iterate_linear_disktable;
+ md->kvs_linear = kvs;
+ md->currently_allocated_on_disk_offset =
+ le64_to_cpu(md->disk_super->metadata_nr_blocks);
+
+ return &(kvs->ckvs);
+}
+
+/********************************************************
+ * Sparse KVS Functions *
+ ********************************************************/
+
+static int kvs_delete_sparse_disktable(struct kvstore *kvs,
+ void *key, int32_t ksize)
+{
+ uint64_t idxhead = *((uint64_t *)key);
+ uint32_t entry_size, head, tail, offset_in_block, block_size;
+ uint32_t first_run = 1, bitmap_offset;
+ int ret = 0;
+ uint64_t actual_block, prev_block = 0;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize != kvs->ksize)
+ return -EINVAL;
+
+ entry_size = kvs->vsize + kvs->ksize;
+ head = idxhead % kvdtb->kmax;
+ tail = head;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ do {
+ actual_block = kvdtb->start_block + head /
+ kvdtb->num_entries_per_block;
+ bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+ offset_in_block = (head % kvdtb->num_entries_per_block) *
+ entry_size + kvdtb->bitmap_size;
+
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out;
+ }
+
+ if (first_run || (actual_block != prev_block)) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ actual_block = prev_block;
+ first_run = 0;
+ }
+
+ if (test_bit(bitmap_offset, bitmap) == 0) {
+ ret = -ENODEV;
+ goto out;
+ }
+
+ if (memcmp(p + offset_in_block, key, kvs->ksize))
+ head = (head + 1) % kvdtb->kmax;
+ else {
+ bitmap_set(bitmap, bitmap_offset + 1, 1);
+ memcpy(p, bitmap, kvdtb->bitmap_size);
+
+ dm_bufio_mark_buffer_dirty(buf);
+
+ ret = 0;
+ goto out;
+ }
+ } while (head != tail);
+
+ ret = -ENODEV;
+
+out:
+ if (buf)
+ dm_bufio_release(buf);
+ kfree(bitmap);
+ return ret;
+}
+
+/*
+ * 0 - not found
+ * 1 - found
+ * < 0 - error on lookup
+ */
+static int kvs_lookup_sparse_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value, int32_t *vsize)
+{
+ uint64_t idxhead = *((uint64_t *)key);
+ uint32_t entry_size, head, tail, offset_in_block, block_size;
+ uint32_t first_run = 1, bitmap_offset;
+ uint64_t actual_block, prev_block = 0;
+ int ret = 0;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize != kvs->ksize)
+ return -EINVAL;
+
+ entry_size = kvs->vsize + kvs->ksize;
+ head = idxhead % kvdtb->kmax;
+ tail = head;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ do {
+ actual_block = kvdtb->start_block + head /
+ kvdtb->num_entries_per_block;
+ bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+ offset_in_block = (head % kvdtb->num_entries_per_block) *
+ entry_size + kvdtb->bitmap_size;
+
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out;
+ }
+
+ if (first_run || (actual_block != prev_block)) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ actual_block = prev_block;
+ first_run = 0;
+ }
+
+ if (test_bit(bitmap_offset, bitmap) == 0) {
+ ret = 0;
+ goto out;
+ }
+
+ if (test_bit(bitmap_offset + 1, bitmap) != 0) {
+ head = (head + 1) % kvdtb->kmax;
+ continue;
+ }
+
+ if (memcmp(p + offset_in_block, key, kvs->ksize))
+ head = (head + 1) % kvdtb->kmax;
+ else {
+ memcpy(value, p + offset_in_block + kvs->ksize,
+ kvs->vsize);
+
+ *vsize = kvs->vsize;
+
+ ret = 1;
+ goto out;
+ }
+
+ } while (head != tail);
+
+ ret = 0;
+
+out:
+ if (buf)
+ dm_bufio_release(buf);
+ kfree(bitmap);
+ return ret;
+}
+
+static int kvs_insert_sparse_disktable(struct kvstore *kvs, void *key,
+ int32_t ksize, void *value, int32_t vsize)
+{
+ uint64_t idxhead = *((uint64_t *)key);
+ uint32_t entry_size, head, tail, offset_in_block, block_size;
+ uint32_t first_run = 1, bitmap_offset;
+ uint64_t actual_block, prev_block = 0;
+ struct dm_buffer *buf = NULL;
+ int ret = 0;
+ unsigned long *bitmap;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ if (ksize > kvs->ksize)
+ return -EINVAL;
+
+ entry_size = kvs->vsize + kvs->ksize;
+ head = idxhead % kvdtb->kmax;
+ tail = head;
+
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ do {
+ actual_block = kvdtb->start_block + head /
+ kvdtb->num_entries_per_block;
+ bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+ offset_in_block = (head % kvdtb->num_entries_per_block) *
+ entry_size + kvdtb->bitmap_size;
+
+ if (actual_block > kvdtb->end_block) {
+ ret = -ERANGE;
+ goto out;
+ }
+
+ if (first_run || (actual_block != prev_block)) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+ if (unlikely(IS_ERR(p))) {
+ ret = PTR_ERR(p);
+ goto out;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ prev_block = actual_block;
+ first_run = 0;
+ }
+
+ if ((test_bit(bitmap_offset, bitmap) == 0) ||
+ (test_bit(bitmap_offset + 1, bitmap) != 0)) {
+ memcpy(p + offset_in_block, key, kvs->ksize);
+ memcpy(p + offset_in_block + kvs->ksize, value,
+ kvs->vsize);
+
+ bitmap_set(bitmap, bitmap_offset, 1);
+ bitmap_clear(bitmap, bitmap_offset + 1, 1);
+ memcpy(p, bitmap, kvdtb->bitmap_size);
+
+ dm_bufio_mark_buffer_dirty(buf);
+
+ ret = 0;
+ goto out;
+ }
+
+ head = (head + 1) % kvdtb->kmax;
+
+ } while (head != tail);
+
+ ret = -ENOSPC;
+
+out:
+ if (buf)
+ dm_bufio_release(buf);
+ kfree(bitmap);
+ return ret;
+}
+
+/*
+ *
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ * Make sure that the store is implemented such that
+ * deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_sparse_disktable(struct kvstore *kvs,
+ int (*iteration_action)(void *key, int32_t ksize,
+ void *value, int32_t vsize, void *data), void *data)
+{
+ int err = 0;
+ uint32_t entry_size, head = 0, block_size, offset_in_block;
+ uint32_t first_run = 1, bitmap_offset = 0;
+ uint64_t actual_block, prev_block = 0;
+ struct dm_buffer *buf = NULL;
+ unsigned long *bitmap;
+ void *p = NULL;
+ struct kvstore_disktable *kvdtb = NULL;
+
+ BUG_ON(!kvs);
+
+ kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+ entry_size = kvs->vsize + kvs->ksize;
+ block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+ bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+ if (!bitmap)
+ return -ENOMEM;
+
+ do {
+ actual_block = kvdtb->start_block + head /
+ kvdtb->num_entries_per_block;
+ bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+ offset_in_block = (head % kvdtb->num_entries_per_block) *
+ entry_size + kvdtb->bitmap_size;
+
+ if (actual_block > kvdtb->end_block) {
+ err = -ERANGE;
+ goto out;
+ }
+
+ if (first_run || (actual_block != prev_block)) {
+ if (buf && !first_run) {
+ dm_bufio_release(buf);
+ buf = NULL;
+ }
+
+ p = dm_bufio_read(kvdtb->md->client, actual_block,
+ &buf);
+ if (unlikely(IS_ERR(p))) {
+ err = PTR_ERR(p);
+ goto out;
+ }
+
+ memcpy(bitmap, p, kvdtb->bitmap_size);
+ prev_block = actual_block;
+ first_run = 0;
+ }
+
+ if ((test_bit(bitmap_offset, bitmap) != 0) &&
+ (test_bit(bitmap_offset + 1, bitmap) == 0)) {
+ err = iteration_action(p + offset_in_block,
+ kvs->ksize,
+ p + kvs->ksize + offset_in_block,
+ kvs->vsize, data);
+
+ if (err < 0)
+ goto out;
+ }
+
+ head = (head + 1) % kvdtb->kmax;
+ } while (head);
+
+out:
+ if (buf)
+ dm_bufio_release(buf);
+ kfree(bitmap);
+ return err;
+}
+
+static struct kvstore *kvs_create_sparse_disktable(struct metadata *md,
+ uint32_t ksize, uint32_t vsize, uint32_t knummax,
+ bool unformatted)
+{
+ struct kvstore_disktable *kvs;
+ uint64_t kvstore_size, metadata_dev_size, start, end, i, num_blocks;
+ uint32_t block_size, num_entries_per_block, bitmap_size;
+ struct dm_buffer *buf = NULL;
+ void *p;
+
+ if (md->create_new) {
+ if (!vsize || !ksize || !knummax)
+ return ERR_PTR(-ENOTSUPP);
+
+ /* We do not support two or more KVSs at the moment */
+ if (md->kvs_sparse)
+ return ERR_PTR(-EBUSY);
+ }
+
+ kvs = kmalloc(sizeof(*kvs), GFP_KERNEL);
+ if (!kvs)
+ return ERR_PTR(-ENOMEM);
+
+ block_size = dm_bufio_get_block_size(md->client);
+
+ if (!md->create_new) {
+ if (le32_to_cpu(md->disk_super->hash_pcn_vsize) != vsize) {
+ pr_err("Value size passed does not match value size "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ if (le32_to_cpu(md->disk_super->hash_pcn_ksize) != ksize) {
+ pr_err("Key size passed does not match key size "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ if (le32_to_cpu(md->disk_super->hash_pcn_kmax) != knummax) {
+ pr_err("Max keys passed does not match max keys "
+ "stored on metadata disk");
+ kfree(kvs);
+ return ERR_PTR(-EINVAL);
+ }
+
+ start = le64_to_cpu(md->disk_super->hash_pcn_start);
+ end = le64_to_cpu(md->disk_super->hash_pcn_end);
+ num_entries_per_block =
+ le32_to_cpu(md->disk_super->hash_pcn_num_entries);
+ bitmap_size =
+ le32_to_cpu(md->disk_super->hash_pcn_bitmap_size);
+ num_blocks = (knummax / num_entries_per_block) + 1;
+ kvstore_size = num_blocks * block_size;
+ } else {
+ metadata_dev_size = dm_bufio_get_device_size(md->client);
+
+ num_entries_per_block = block_size / (vsize + ksize);
+
+ bitmap_size = (block_size - num_entries_per_block *
+ (vsize + ksize)) * 8;
+
+ while (bitmap_size < (num_entries_per_block * 2)) {
+ bitmap_size += (vsize + ksize) * 8;
+ num_entries_per_block--;
+ }
+
+ /* We want the size in bytes */
+ bitmap_size = bitmap_size / 8;
+
+ num_blocks = (knummax / num_entries_per_block) + 1;
+ kvstore_size = num_blocks * block_size;
+
+ if (((kvstore_size / block_size) +
+ md->currently_allocated_on_disk_offset) >
+ metadata_dev_size) {
+ pr_err("Sparse kvs store cannot be created, "
+ "metadata device too small");
+ kfree(kvs);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ start = md->currently_allocated_on_disk_offset;
+ end = md->currently_allocated_on_disk_offset +
+ (kvstore_size / block_size) + 1;
+
+ for (i = start; i < end; i++) {
+ p = dm_bufio_new(md->client, i, &buf);
+ if (unlikely(IS_ERR(p))) {
+ kfree(kvs);
+ return p;
+ }
+
+ memset(p, 0, block_size);
+ dm_bufio_mark_buffer_dirty(buf);
+ dm_bufio_release(buf);
+ }
+
+ md->disk_super->hash_pcn_vsize = cpu_to_le32(vsize);
+ md->disk_super->hash_pcn_ksize = cpu_to_le32(ksize);
+ md->disk_super->hash_pcn_kmax = cpu_to_le32(knummax);
+ md->disk_super->hash_pcn_start = cpu_to_le64(start);
+ md->disk_super->hash_pcn_end = cpu_to_le64(end);
+ md->disk_super->hash_pcn_num_entries =
+ cpu_to_le32(num_entries_per_block);
+ md->disk_super->hash_pcn_bitmap_size =
+ cpu_to_le32(bitmap_size);
+ md->disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+ }
+
+ pr_info("Space required on disk for sparse key value store: "
+ "%llu.%06llu MB\n", kvstore_size / (1024 * 1024),
+ kvstore_size - ((kvstore_size / (1024 * 1024))
+ * (1024 * 1024)));
+
+ kvs->ckvs.vsize = vsize;
+ kvs->ckvs.ksize = ksize;
+ kvs->kmax = knummax;
+ kvs->start_block = start;
+ kvs->end_block = end;
+ kvs->num_entries_per_block = num_entries_per_block;
+ kvs->bitmap_size = bitmap_size;
+ kvs->md = md;
+
+ kvs->ckvs.kvs_insert = kvs_insert_sparse_disktable;
+ kvs->ckvs.kvs_lookup = kvs_lookup_sparse_disktable;
+ kvs->ckvs.kvs_delete = kvs_delete_sparse_disktable;
+ kvs->ckvs.kvs_iterate = kvs_iterate_sparse_disktable;
+ md->currently_allocated_on_disk_offset =
+ le64_to_cpu(md->disk_super->metadata_nr_blocks);
+
+ md->kvs_sparse = kvs;
+
+ return &(kvs->ckvs);
+}
+
+void flush_bufio_cache_disktable(struct metadata *md)
+{
+ drop_buffers_user(md->client);
+ return;
+}
+
+struct metadata_ops metadata_ops_disktable = {
+ .init_meta = init_meta_disktable,
+ .exit_meta = exit_meta_disktable,
+ .kvs_create_linear = kvs_create_linear_disktable,
+ .kvs_create_sparse = kvs_create_sparse_disktable,
+
+ .alloc_data_block = alloc_data_block_disktable,
+ .inc_refcount = inc_refcount_disktable,
+ .dec_refcount = dec_refcount_disktable,
+ .get_refcount = get_refcount_disktable,
+
+ .flush_meta = flush_meta_disktable,
+
+ .get_private_data = get_private_data_disktable,
+ .set_private_data = set_private_data_disktable,
+
+ .flush_bufio_cache = flush_bufio_cache_disktable,
+};