Add disktable code dm-dedup-disktable
authorSonam Mandal <sonam.dp42@gmail.com>
Tue, 9 Feb 2016 05:36:40 +0000 (00:36 -0500)
committerSonam Mandal <sonam.dp42@gmail.com>
Tue, 9 Feb 2016 05:36:40 +0000 (00:36 -0500)
drivers/md/Makefile
drivers/md/dm-bufio.c
drivers/md/dm-bufio.h
drivers/md/dm-dedup-dtb.c [new file with mode: 0644]
drivers/md/dm-dedup-dtb.h [new file with mode: 0644]
drivers/md/dm-dedup-target.c

index c3651e366c06f1041f6ac6485929ed00f01180db..d9aa9549de47bee0fe8c47fc2e9bad17e6981559 100644 (file)
@@ -14,7 +14,7 @@ dm-thin-pool-y        += dm-thin.o dm-thin-metadata.o
 dm-cache-y     += dm-cache-target.o dm-cache-metadata.o dm-cache-policy.o
 dm-cache-mq-y   += dm-cache-policy-mq.o
 dm-cache-cleaner-y += dm-cache-policy-cleaner.o
-dm-dedup-y     += dm-dedup-target.o dm-dedup-hash.o dm-dedup-rw.o dm-dedup-cbt.o dm-dedup-ram.o
+dm-dedup-y     += dm-dedup-target.o dm-dedup-hash.o dm-dedup-rw.o dm-dedup-cbt.o dm-dedup-ram.o dm-dedup-dtb.o
 dm-era-y       += dm-era-target.o
 md-mod-y       += md.o bitmap.o
 raid456-y      += raid5.o
index ab472c557d18c060d8230bbfcadbd747b36f0fd0..f544c34c5ec38ac1e520d4c511871d07f7cc5b9b 100644 (file)
@@ -1432,6 +1432,26 @@ static void drop_buffers(struct dm_bufio_client *c)
        dm_bufio_unlock(c);
 }
 
+void drop_buffers_user(struct dm_bufio_client *c)
+{
+       struct dm_buffer *b;
+
+       BUG_ON(dm_bufio_in_request());
+
+       /*
+        * An optimization so that the buffers are not written one-by-one.
+        */
+       dm_bufio_write_dirty_buffers_async(c);
+
+       dm_bufio_lock(c);
+
+       while ((b = __get_unclaimed_buffer(c)))
+               __free_buffer_wake(b);
+
+       dm_bufio_unlock(c);
+}
+EXPORT_SYMBOL_GPL(drop_buffers_user);
+
 /*
  * Test if the buffer is unused and too old, and commit it.
  * At if noio is set, we must not do any I/O because we hold
index c096779a7292f729c5198ee42781de2d46ffbe05..f6a4b4ee54beef97f49435420fdaf6a791f38247 100644 (file)
@@ -126,6 +126,7 @@ sector_t dm_bufio_get_block_number(struct dm_buffer *b);
 void *dm_bufio_get_block_data(struct dm_buffer *b);
 void *dm_bufio_get_aux_data(struct dm_buffer *b);
 struct dm_bufio_client *dm_bufio_get_client(struct dm_buffer *b);
+void drop_buffers_user(struct dm_bufio_client *c);
 
 /*----------------------------------------------------------------*/
 
diff --git a/drivers/md/dm-dedup-dtb.c b/drivers/md/dm-dedup-dtb.c
new file mode 100644 (file)
index 0000000..d83dec5
--- /dev/null
@@ -0,0 +1,1498 @@
+/*
+ * Copyright (c) 2012-2013 File systems and Storage Lab (FSL)
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published by
+ * the Free Software Foundation.
+ *
+ */
+
+#include <linux/errno.h>
+
+#include "dm-bufio.h"
+#include "dm-dedup-dtb.h"
+#include "dm-dedup-backend.h"
+#include "dm-dedup-kvstore.h"
+
+#define EMPTY_ENTRY -5
+#define DELETED_ENTRY -6
+
+#define UINT32_MAX     (4294967295U)
+#define METADATA_BLOCK_SIZE    4096
+#define METADATA_MAX_HELD_PER_THREAD   5
+#define METADATA_SUPERBLOCK_START      0
+/* The Answer to the Ultimate Question of Life, The Universe, and Everything */
+#define METADATA_SUPERBLOCK_MAGIC      42424242
+
+struct metadata_superblock {
+       __le64 blocknr; /* This block number, dm_block_t. */
+       __le64 magic; /* Magic number to check against */
+       __le64 data_space_map_start; /* Data space map start block */
+       __le64 data_space_map_end; /* Data space map end block */
+       __le64 data_space_map_smax; /* Maximum number of entries */
+       __le64 data_space_map_allocptr; /* Data space map last allocptr */
+       __le32 data_space_map_vsize; /* Data space map value size */
+       __le64 lbn_pcn_start; /* lbn pcn table start block. */
+       __le64 lbn_pcn_end; /* lbn pcn table end block. */
+       __le32 lbn_pcn_bitmap_size; /* Size of space bitmap per block  */
+       __le32 lbn_pcn_num_entries; /* Number of entries stored per block */
+       __le32 lbn_pcn_ksize; /* Key size */
+       __le32 lbn_pcn_vsize; /* Value size */
+       __le32 lbn_pcn_kmax; /* Maximum number of keys */
+       __le64 hash_pcn_start; /* hash pcn table start block. */
+       __le64 hash_pcn_end; /* hash pcn table end block. */
+       __le32 hash_pcn_bitmap_size; /* Size of space bitmap per block  */
+       __le32 hash_pcn_num_entries; /* Number of entries stored per block */
+       __le32 hash_pcn_ksize; /* Key size */
+       __le32 hash_pcn_vsize; /* Value size */
+       __le32 hash_pcn_kmax; /* Maximum number of keys */
+       __le64 metadata_block_size; /* In bytes */
+       __le64 metadata_nr_blocks;/* Number of metadata blocks used. */
+} __packed;
+
+struct metadata {
+       struct block_device *metadata_dev;
+       struct dm_bufio_client *client;
+       uint64_t currently_allocated_on_disk_offset;
+
+       /* Superblock information */
+       struct metadata_superblock *disk_super;
+       void *disk_super_data;
+       int create_new;
+
+       /* Space Map */
+       uint64_t smax;
+       uint64_t allocptr;
+       uint64_t smap_start_block;
+       uint64_t smap_end_block;
+
+       /*
+        * XXX: Currently we support only one linear and one sparse KVS.
+        */
+       struct kvstore_disktable *kvs_linear;
+       struct kvstore_disktable *kvs_sparse;
+
+       /* Private data */
+       void *private;
+       uint32_t priv_size;
+};
+
+struct kvstore_disktable {
+       struct kvstore ckvs;
+       uint32_t kmax;
+       sector_t start_block;
+       sector_t end_block;
+       struct metadata *md;
+       uint32_t num_entries_per_block;
+       uint32_t bitmap_size;
+};
+
+static void disktable_metadata_alloc_callback(struct dm_buffer *buf)
+{
+       return;
+}
+
+static void disktable_metadata_write_callback(struct dm_buffer *buf)
+{
+       return;
+}
+
+static struct metadata *init_meta_disktable(void *p, bool *unformatted)
+{
+       int create_new = 1;
+       uint64_t smap_size, start, end, i, smax, allocptr;
+       struct metadata *md;
+       struct dm_bufio_client *client;
+       struct dm_buffer *buf;
+       struct init_param_disktable *param = (struct init_param_disktable *)p;
+       void *ptr, *ret, *disk_super_data = NULL;
+       struct metadata_superblock *disk_super = NULL;
+
+       md = kmalloc(sizeof(*md), GFP_KERNEL);
+       if (!md)
+               return ERR_PTR(-ENOMEM);
+
+       client = dm_bufio_client_create(param->metadata_dev,
+                       METADATA_BLOCK_SIZE, METADATA_MAX_HELD_PER_THREAD, 0,
+                       disktable_metadata_alloc_callback,
+                       disktable_metadata_write_callback);
+
+       if (IS_ERR_OR_NULL(client)) {
+               ret = (struct metadata *)client;
+               goto out_md;
+       }
+
+       disk_super = kmalloc(sizeof(struct metadata_superblock), GFP_KERNEL);
+       if (!disk_super) {
+               ret = ERR_PTR(-ENOMEM);
+               goto out_client;
+       }
+       memset(disk_super, 0, sizeof(struct metadata_superblock));
+
+       if (!param->reconstruct_meta)
+               create_new = 1;
+       else if (param->reconstruct_meta == 1)
+               create_new = 0;
+       else {
+               ret = ERR_PTR(-EINVAL);
+               goto out_disk_super;
+       }
+
+       if (!create_new) {
+               /* Read superblock from disk and try to reconstruct */
+               ptr = dm_bufio_read(client, METADATA_SUPERBLOCK_START, &buf);
+               if (unlikely(IS_ERR(ptr))) {
+                       ret = ptr;
+                       goto out_disk_super;
+               }
+
+               memcpy(disk_super, ptr, sizeof(struct metadata_superblock));
+
+               if (disk_super->magic != METADATA_SUPERBLOCK_MAGIC) {
+                       pr_err("Superblock is invalid, cannot setup backend.");
+                       dm_bufio_release(buf);
+                       ret = ERR_PTR(-EINVAL);
+                       goto out_disk_super;
+               }
+
+               disk_super_data = kmalloc(le64_to_cpu(
+                       disk_super->metadata_block_size), GFP_KERNEL);
+               if (!disk_super_data) {
+                       dm_bufio_release(buf);
+                       ret = ERR_PTR(-ENOMEM);
+                       goto out_disk_super;
+               }
+               memcpy(disk_super_data, ptr, le64_to_cpu(
+                                       disk_super->metadata_block_size));
+
+               dm_bufio_release(buf);
+
+               start = le64_to_cpu(disk_super->data_space_map_start);
+               end = le64_to_cpu(disk_super->data_space_map_end);
+               smax = le64_to_cpu(disk_super->data_space_map_smax);
+               if (smax != param->blocks) {
+                       pr_err("The number of blocks sent as parameter "
+                                       "don't match what is saved on disk.");
+                       ret = ERR_PTR(-EINVAL);
+                       goto out_disk_super_data;
+               }
+
+               allocptr = le64_to_cpu(disk_super->data_space_map_allocptr);
+               smap_size = smax *
+                       le32_to_cpu(disk_super->data_space_map_vsize);
+       } else {
+               /* Need to initialize a new superblock on the disk */
+               smax = param->blocks;
+               smap_size = param->blocks * sizeof(uint32_t);
+               allocptr = 0;
+
+               start = 1;
+               end = start + (smap_size / METADATA_BLOCK_SIZE) + 1;
+
+               /* Initialize space map on disk */
+               for (i = start; i <= end; i++) {
+                       ptr = dm_bufio_new(client, i, &buf);
+                       if (unlikely(IS_ERR(ptr))) {
+                               ret = ptr;
+                               goto out_disk_super;
+                       }
+
+                       memset(ptr, 0, METADATA_BLOCK_SIZE);
+                       dm_bufio_mark_buffer_dirty(buf);
+                       dm_bufio_release(buf);
+               }
+
+               disk_super->blocknr = cpu_to_le64(METADATA_SUPERBLOCK_START);
+               disk_super->magic = cpu_to_le64(METADATA_SUPERBLOCK_MAGIC);
+               disk_super->data_space_map_start = cpu_to_le64(start);
+               disk_super->data_space_map_end = cpu_to_le64(end);
+               disk_super->data_space_map_allocptr = cpu_to_le64(allocptr);
+               disk_super->data_space_map_vsize =
+                       cpu_to_le32(sizeof(uint32_t));
+               disk_super->data_space_map_smax = cpu_to_le64(smax);
+               disk_super->metadata_block_size =
+                       cpu_to_le64(METADATA_BLOCK_SIZE);
+               disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+       }
+
+       md->smax = smax;
+       md->allocptr = allocptr;
+       md->kvs_linear = NULL;
+       md->kvs_sparse = NULL;
+       md->metadata_dev = param->metadata_dev;
+       md->client = client;
+       md->smap_start_block = start;
+       md->smap_end_block = end;
+       md->currently_allocated_on_disk_offset =
+               disk_super->metadata_nr_blocks;
+       md->disk_super = disk_super;
+       md->create_new = create_new;
+       md->private = NULL;
+       md->priv_size = 0;
+       md->disk_super_data = disk_super_data;
+
+       DMINFO("Initializing DM_BUFIO backend");
+
+       pr_info("Space required on disk for pcn reference count map: "
+                       "%llu.%06llu MB\n", smap_size / (1024 * 1024),
+                       smap_size - ((smap_size /
+                                       (1024 * 1024)) * (1024 * 1024)));
+
+       return md;
+
+out_disk_super_data:
+       if (disk_super_data)
+               kfree(disk_super_data);
+out_disk_super:
+       kfree(disk_super);
+out_client:
+       dm_bufio_client_destroy(client);
+out_md:
+       kfree(md);
+       md = NULL;
+       return ret;
+}
+
+static void exit_meta_disktable(struct metadata *md)
+{
+       int ret = 0;
+       struct dm_buffer *buf = NULL;
+       void *p = NULL;
+
+       p = dm_bufio_new(md->client, METADATA_SUPERBLOCK_START, &buf);
+       if (!IS_ERR(p)) {
+               md->disk_super->data_space_map_allocptr =
+                       cpu_to_le64(md->allocptr);
+               memset(p, 0, md->disk_super->metadata_block_size);
+               memcpy(p, md->disk_super, sizeof(struct metadata_superblock));
+
+               if (md->private)
+                       memcpy(p + sizeof(struct metadata_superblock),
+                                       md->private, md->priv_size);
+
+               dm_bufio_mark_buffer_dirty(buf);
+               dm_bufio_release(buf);
+       } else
+               DMWARN("Getting superblock from disk failed");
+
+       ret = dm_bufio_write_dirty_buffers(md->client);
+       if (ret)
+               DMWARN("Writing dirty buffers failed");
+
+       ret = dm_bufio_issue_flush(md->client);
+       if (ret)
+               DMWARN("Flushing buffers failed");
+
+       if (md->kvs_linear)
+               kfree(md->kvs_linear);
+
+       if (md->kvs_sparse)
+               kfree(md->kvs_sparse);
+
+       if (md->disk_super)
+               kfree(md->disk_super);
+
+       if (md->disk_super_data)
+               kfree(md->disk_super_data);
+
+       if (md->private)
+               kfree(md->private);
+
+       dm_bufio_client_destroy(md->client);
+
+       kfree(md);
+
+       return;
+}
+
+
+static int flush_meta_disktable(struct metadata *md)
+{
+       int ret = 0;
+       struct dm_buffer *buf = NULL;
+       void *p = NULL;
+
+       p = dm_bufio_new(md->client, METADATA_SUPERBLOCK_START, &buf);
+       if (unlikely(IS_ERR(p))) {
+               ret = PTR_ERR(p);
+               return ret;
+       }
+
+       memset(p, 0, md->disk_super->metadata_block_size);
+       memcpy(p, md->disk_super, sizeof(struct metadata_superblock));
+
+       if (md->private)
+               memcpy(p + sizeof(struct metadata_superblock),
+                               md->private, md->priv_size);
+
+       dm_bufio_mark_buffer_dirty(buf);
+       dm_bufio_release(buf);
+
+       ret = dm_bufio_write_dirty_buffers(md->client);
+       if (ret)
+               return ret;
+
+       ret = dm_bufio_issue_flush(md->client);
+
+       return ret;
+}
+
+
+/********************************************************
+ *             Private Data Functions                  *
+ ********************************************************/
+
+static int get_private_data_disktable(struct metadata *md, void **priv,
+               uint32_t priv_size)
+{
+       if (priv_size <= 0)
+               return -EINVAL;
+
+       if (md->private)
+               *priv = md->private;
+       else {
+               /* Try to read from saved superblock data */
+               if (md->disk_super_data) {
+                       md->private = kmalloc(priv_size, GFP_KERNEL);
+                       if (!md->private)
+                               return -ENOMEM;
+
+                       memcpy(md->private, md->disk_super_data + sizeof(
+                               struct metadata_superblock), priv_size);
+
+                       md->priv_size = priv_size;
+                       *priv = md->private;
+               } else
+                       return -EINVAL;
+       }
+
+       return 0;
+}
+
+static int set_private_data_disktable(struct metadata *md, void *priv,
+               uint32_t priv_size)
+{
+       if (priv_size <= 0 || !priv)
+               return -EINVAL;
+
+       if (md->private)
+               kfree(md->private);
+
+       md->private = kmalloc(priv_size, GFP_KERNEL);
+       if (!md->private)
+               return -ENOMEM;
+
+       memcpy(md->private, priv, priv_size);
+       md->priv_size = priv_size;
+
+       return 0;
+}
+
+/********************************************************
+ *             Space Management Functions              *
+ ********************************************************/
+
+static int alloc_data_block_disktable(struct metadata *md, uint64_t *blockn)
+{
+       uint64_t head, tail, actual_block, prev_block = 0;
+       uint32_t offset_in_block, block_size, first_run = 1;
+       struct dm_buffer *buf = NULL;
+       uint32_t value;
+       void *p = NULL;
+
+       head = tail = md->allocptr;
+       block_size = dm_bufio_get_block_size(md->client);
+
+       do {
+               actual_block = md->smap_start_block + (head *
+                               sizeof(uint32_t)) / block_size;
+               offset_in_block = (head * sizeof(uint32_t)) % block_size;
+
+               /* Only read from disk when a new block is required */
+               if ((actual_block != prev_block) || first_run) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(md->client, actual_block, &buf);
+                       if (unlikely(IS_ERR(p)))
+                               return PTR_ERR(p);
+
+                       prev_block = actual_block;
+                       first_run = 0;
+               }
+
+               value = *((uint32_t *)(p + offset_in_block));
+
+               if (!value) {
+                       value = 1;
+                       memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+                       dm_bufio_mark_buffer_dirty(buf);
+                       dm_bufio_release(buf);
+                       *blockn = head;
+                       md->allocptr = (head + 1) % md->smax;
+                       return 0;
+               }
+
+               head = (head + 1) % md->smax;
+
+       } while (head != tail);
+
+       if (buf)
+               dm_bufio_release(buf);
+
+       return -ENOSPC;
+}
+
+static int inc_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+       uint32_t block_size, offset_in_block, value;
+       uint64_t actual_block;
+       struct dm_buffer *buf;
+       void *p;
+
+       if (blockn >= md->smax)
+               return -ERANGE;
+
+       block_size = dm_bufio_get_block_size(md->client);
+       actual_block = md->smap_start_block + (blockn *
+                       sizeof(uint32_t)) / block_size;
+       offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+       p = dm_bufio_read(md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p)))
+               return PTR_ERR(p);
+
+       value = *((uint32_t *)(p + offset_in_block));
+
+       if (value != UINT32_MAX) {
+               value++;
+               memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+               dm_bufio_mark_buffer_dirty(buf);
+               dm_bufio_release(buf);
+       } else {
+               dm_bufio_release(buf);
+               return -E2BIG;
+       }
+
+       return 0;
+}
+
+static int dec_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+       uint32_t block_size, offset_in_block, value;
+       uint64_t actual_block;
+       struct dm_buffer *buf;
+       void *p;
+
+       if (blockn >= md->smax)
+               return -ERANGE;
+
+       block_size = dm_bufio_get_block_size(md->client);
+       actual_block = md->smap_start_block + (blockn *
+                       sizeof(uint32_t)) / block_size;
+       offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+       p = dm_bufio_read(md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p)))
+               return PTR_ERR(p);
+
+       value = *((uint32_t *)(p + offset_in_block));
+
+       if (value) {
+               value--;
+               memcpy(p + offset_in_block, &value, sizeof(uint32_t));
+               dm_bufio_mark_buffer_dirty(buf);
+               dm_bufio_release(buf);
+       } else {
+               dm_bufio_release(buf);
+               return -EFAULT;
+       }
+
+       return 0;
+}
+
+static int get_refcount_disktable(struct metadata *md, uint64_t blockn)
+{
+       uint32_t block_size, offset_in_block, value;
+       uint64_t actual_block;
+       struct dm_buffer *buf;
+       void *p;
+
+       if (blockn >= md->smax)
+               return -ERANGE;
+
+       block_size = dm_bufio_get_block_size(md->client);
+       actual_block = md->smap_start_block + (blockn *
+                       sizeof(uint32_t)) / block_size;
+       offset_in_block = (blockn * sizeof(uint32_t)) % block_size;
+
+       p = dm_bufio_read(md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p)))
+               return PTR_ERR(p);
+
+       value = *((uint32_t *)(p + offset_in_block));
+
+       dm_bufio_release(buf);
+
+       return value;
+}
+
+/********************************************************
+ *             General KVS Functions                   *
+ ********************************************************/
+
+#if 0
+static int kvs_delete_disktable(struct kvstore *kvs, void *key, int32_t ksize)
+{
+       int r;
+
+       r = 0;
+       if (kvs->kvs_delete)
+               r = kvs->kvs_delete(kvs, key, ksize);
+
+       return r;
+}
+
+static int kvs_lookup_disktable(struct kvstore *kvs, void *key,
+                       int32_t ksize, void *value, int32_t *vsize)
+{
+       int r;
+
+       r = 0;
+       if (kvs->kvs_lookup)
+               r = kvs->kvs_lookup(kvs, key, ksize, value, vsize);
+
+       return r;
+}
+
+static int kvs_insert_disktable(struct kvstore *kvs, void *key,
+                       int32_t ksize, void *value, int32_t vsize)
+{
+       int r;
+
+       r = 0;
+       if (kvs->kvs_insert)
+               r = kvs->kvs_insert(kvs, key, ksize, value, vsize);
+
+       return r;
+}
+
+/*
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ *     Make sure that the store is implemented such that
+ *     deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_disktable(struct kvstore *kvs, int (*itr_action)
+                       (void *key, int32_t ksize, void *value, int32_t vsize,
+                       void *data), void *data)
+{
+       int r;
+
+       r = 0;
+       if (kvs->kvs_iterate)
+               r = kvs->kvs_iterate(kvs, itr_action, data);
+
+       return r;
+}
+#endif
+
+/*********************************************************
+ *             Linear KVS Functions                     *
+ *********************************************************/
+
+static int kvs_delete_linear_disktable(struct kvstore *kvs,
+               void *key, int32_t ksize)
+{
+       int ret = 0;
+       uint64_t idx, actual_block;
+       uint32_t block_size, offset_in_block, bitmap_offset;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap = NULL;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize != kvs->ksize)
+               return -EINVAL;
+
+       idx = *((uint64_t *)key);
+
+       if (idx > kvdtb->kmax)
+               return -ERANGE;
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+       bitmap_offset = idx % kvdtb->num_entries_per_block;
+       offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+       if (actual_block > kvdtb->end_block) {
+               ret = -ERANGE;
+               goto out_bitmap;
+       }
+
+       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p))) {
+               ret = PTR_ERR(p);
+               goto out_bitmap;
+       }
+
+       memcpy(bitmap, p, kvdtb->bitmap_size);
+       if (test_bit(bitmap_offset, bitmap) == 0) {
+               ret = -ENODEV;
+               goto out_buf;
+       }
+
+       bitmap_clear(bitmap, bitmap_offset, 1);
+       memcpy(p, bitmap, kvdtb->bitmap_size);
+       dm_bufio_mark_buffer_dirty(buf);
+
+out_buf:
+       dm_bufio_release(buf);
+out_bitmap:
+       kfree(bitmap);
+       return ret;
+}
+
+/*
+ * 0 - not found
+ * 1 - found
+ * < 0 - error on lookup
+ */
+static int kvs_lookup_linear_disktable(struct kvstore *kvs, void *key,
+               int32_t ksize, void *value, int32_t *vsize)
+{
+       int ret = 1;
+       uint64_t idx, actual_block;
+       uint32_t block_size, offset_in_block, bitmap_offset;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap = NULL;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize != kvs->ksize)
+               return -EINVAL;
+
+       idx = *((uint64_t *)key);
+
+       if (idx > kvdtb->kmax)
+               return -ERANGE;
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+       bitmap_offset = idx % kvdtb->num_entries_per_block;
+       offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+       if (actual_block > kvdtb->end_block) {
+               ret = -ERANGE;
+               goto out_bitmap;
+       }
+
+       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p))) {
+               ret = PTR_ERR(p);
+               goto out_bitmap;
+       }
+
+       memcpy(bitmap, p, kvdtb->bitmap_size);
+       if (test_bit(bitmap_offset, bitmap) == 0) {
+               ret = 0;
+               goto out_buf;
+       }
+
+       memcpy(value, p + offset_in_block, kvs->vsize);
+       *vsize = kvs->vsize;
+
+out_buf:
+       dm_bufio_release(buf);
+out_bitmap:
+       kfree(bitmap);
+       return ret;
+}
+
+static int kvs_insert_linear_disktable(struct kvstore *kvs, void *key,
+               int32_t ksize, void *value,
+               int32_t vsize)
+{
+       int ret = 0;
+       uint64_t idx, actual_block;
+       uint32_t block_size, offset_in_block, bitmap_offset;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap = NULL;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize != kvs->ksize)
+               return -EINVAL;
+
+       if (vsize != kvs->vsize)
+               return -EINVAL;
+
+       idx = *((uint64_t *)key);
+
+       if (idx > kvdtb->kmax)
+               return -ERANGE;
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       actual_block = kvdtb->start_block + idx / kvdtb->num_entries_per_block;
+       bitmap_offset = idx % kvdtb->num_entries_per_block;
+       offset_in_block = bitmap_offset * kvs->vsize + kvdtb->bitmap_size;
+       if (actual_block > kvdtb->end_block) {
+               ret = -ERANGE;
+               goto out_bitmap;
+       }
+
+       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+       if (unlikely(IS_ERR(p))) {
+               ret = PTR_ERR(p);
+               goto out_bitmap;
+       }
+
+       memcpy(bitmap, p, kvdtb->bitmap_size);
+
+       bitmap_set(bitmap, bitmap_offset, 1);
+       memcpy(p, bitmap, kvdtb->bitmap_size);
+       memcpy(p + offset_in_block, value, kvs->vsize);
+       dm_bufio_mark_buffer_dirty(buf);
+
+       dm_bufio_release(buf);
+out_bitmap:
+       kfree(bitmap);
+       return ret;
+}
+
+/*
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ *     Make sure that the store is implemented such that
+ *     deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_linear_disktable(struct kvstore *kvs,
+               int (*iteration_action)(void *key, int32_t ksize,
+               void *value, int32_t vsize, void *data), void *data)
+{
+       uint64_t i, actual_block, prev_block = 0;
+       uint32_t block_size, offset_in_block, bitmap_offset;
+       struct dm_buffer *buf = NULL;
+       void *p = NULL;
+       unsigned long *bitmap = NULL;
+       int ret = 0, first_run = 1;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       for (i = 0; i < kvdtb->kmax; i++) {
+               actual_block = kvdtb->start_block + i /
+                       kvdtb->num_entries_per_block;
+               bitmap_offset = i % kvdtb->num_entries_per_block;
+               offset_in_block = bitmap_offset * kvs->vsize +
+                       kvdtb->bitmap_size;
+               if (actual_block > kvdtb->end_block) {
+                       ret = -ERANGE;
+                       goto out;
+               }
+
+               if (first_run || (actual_block != prev_block)) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               ret = PTR_ERR(p);
+                               goto out;
+                       }
+
+                       memcpy(bitmap, p, kvdtb->bitmap_size);
+                       prev_block = actual_block;
+                       first_run = 0;
+               }
+
+               if (test_bit(bitmap_offset, bitmap) != 0) {
+                       ret = iteration_action((void *)&i, kvs->ksize,
+                                       p + offset_in_block, kvs->vsize,
+                                       data);
+                       if (ret < 0)
+                               goto out;
+               }
+       }
+
+out:
+       if (buf)
+               dm_bufio_release(buf);
+       kfree(bitmap);
+       return ret;
+}
+
+static struct kvstore *kvs_create_linear_disktable(struct metadata *md,
+               uint32_t ksize, uint32_t vsize, uint32_t kmax, bool unformatted)
+{
+       struct kvstore_disktable *kvs;
+       uint64_t kvstore_size, start, end, i;
+       sector_t metadata_dev_size;
+       uint32_t block_size, num_entries_per_block, bitmap_size, num_blocks;
+       void *p = NULL;
+       struct dm_buffer *buf = NULL;
+
+       if (md->create_new) {
+               if (!vsize || !ksize || !kmax)
+                       return ERR_PTR(-ENOTSUPP);
+
+               /* Currently only 64bit keys are supported */
+               if (ksize != 8)
+                       return ERR_PTR(-ENOTSUPP);
+       }
+
+       /* We do not support two or more KVSs at the moment */
+       if (md->kvs_linear)
+               return ERR_PTR(-EBUSY);
+
+       kvs = kmalloc(sizeof(*kvs), GFP_KERNEL);
+       if (!kvs)
+               return ERR_PTR(-ENOMEM);
+
+       block_size = dm_bufio_get_block_size(md->client);
+
+       if (!md->create_new) {
+               if (le32_to_cpu(md->disk_super->lbn_pcn_vsize) != vsize) {
+                       pr_err("Value size passed does not match value size "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               if (le32_to_cpu(md->disk_super->lbn_pcn_ksize) != ksize) {
+                       pr_err("Key size passed does not match key size "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               if (le32_to_cpu(md->disk_super->lbn_pcn_kmax) != kmax) {
+                       pr_err("Max keys passed does not match max keys "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               start = le64_to_cpu(md->disk_super->lbn_pcn_start);
+               end = le64_to_cpu(md->disk_super->lbn_pcn_end);
+               num_entries_per_block = le32_to_cpu(
+                               md->disk_super->lbn_pcn_num_entries);
+               bitmap_size = le32_to_cpu(
+                               md->disk_super->lbn_pcn_bitmap_size);
+               num_blocks = (kmax / num_entries_per_block) + 1;
+               kvstore_size = num_blocks * block_size;
+       } else {
+               metadata_dev_size = dm_bufio_get_device_size(md->client);
+
+               num_entries_per_block = block_size / vsize;
+               bitmap_size = block_size - num_entries_per_block * vsize;
+               while (bitmap_size < num_entries_per_block) {
+                       bitmap_size += vsize * 8;
+                       num_entries_per_block--;
+               }
+
+               /* We want the size in bytes */
+               bitmap_size = bitmap_size / 8;
+
+               num_blocks = (kmax / num_entries_per_block) + 1;
+               kvstore_size = num_blocks * block_size;
+
+               if (((kvstore_size / block_size) +
+                               md->currently_allocated_on_disk_offset) >
+                               metadata_dev_size) {
+                       pr_err("Linear kvs store cannot be created, "
+                                       "metadata device too small");
+                       kfree(kvs);
+                       return ERR_PTR(-ENOMEM);
+               }
+
+               start = md->currently_allocated_on_disk_offset;
+               end = md->currently_allocated_on_disk_offset +
+                       (kvstore_size / block_size) + 1;
+
+               for (i = start; i <= end; i++) {
+                       p = dm_bufio_new(md->client, i, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               kfree(kvs);
+                               return p;
+                       }
+
+                       memset(p, 0, block_size);
+                       dm_bufio_mark_buffer_dirty(buf);
+                       dm_bufio_release(buf);
+               }
+
+               md->disk_super->lbn_pcn_vsize = cpu_to_le32(vsize);
+               md->disk_super->lbn_pcn_ksize = cpu_to_le32(ksize);
+               md->disk_super->lbn_pcn_kmax = cpu_to_le32(kmax);
+               md->disk_super->lbn_pcn_start = cpu_to_le64(start);
+               md->disk_super->lbn_pcn_end = cpu_to_le64(end);
+               md->disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+               md->disk_super->lbn_pcn_num_entries =
+                       cpu_to_le32(num_entries_per_block);
+               md->disk_super->lbn_pcn_bitmap_size = cpu_to_le32(bitmap_size);
+       }
+
+       pr_info("Space required on disk for linear key value store: "
+                       "%llu.%06llu MB\n", kvstore_size / (1024 * 1024),
+                       kvstore_size - ((kvstore_size / (1024 * 1024))
+                               * (1024 * 1024)));
+
+       kvs->ckvs.vsize = vsize;
+       kvs->ckvs.ksize = ksize;
+       kvs->kmax = kmax;
+       kvs->start_block = start;
+       kvs->end_block = end;
+       kvs->bitmap_size = bitmap_size;
+       kvs->num_entries_per_block = num_entries_per_block;
+       kvs->md = md;
+
+       kvs->ckvs.kvs_insert = kvs_insert_linear_disktable;
+       kvs->ckvs.kvs_lookup = kvs_lookup_linear_disktable;
+       kvs->ckvs.kvs_delete = kvs_delete_linear_disktable;
+       kvs->ckvs.kvs_iterate = kvs_iterate_linear_disktable;
+       md->kvs_linear = kvs;
+       md->currently_allocated_on_disk_offset =
+               le64_to_cpu(md->disk_super->metadata_nr_blocks);
+
+       return &(kvs->ckvs);
+}
+
+/********************************************************
+ *             Sparse KVS Functions                    *
+ ********************************************************/
+
+static int kvs_delete_sparse_disktable(struct kvstore *kvs,
+               void *key, int32_t ksize)
+{
+       uint64_t idxhead = *((uint64_t *)key);
+       uint32_t entry_size, head, tail, offset_in_block, block_size;
+       uint32_t first_run = 1, bitmap_offset;
+       int ret = 0;
+       uint64_t actual_block, prev_block = 0;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize != kvs->ksize)
+               return -EINVAL;
+
+       entry_size = kvs->vsize + kvs->ksize;
+       head = idxhead % kvdtb->kmax;
+       tail = head;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       do {
+               actual_block = kvdtb->start_block + head /
+                       kvdtb->num_entries_per_block;
+               bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+               offset_in_block = (head % kvdtb->num_entries_per_block) *
+                       entry_size + kvdtb->bitmap_size;
+
+               if (actual_block > kvdtb->end_block) {
+                       ret = -ERANGE;
+                       goto out;
+               }
+
+               if (first_run || (actual_block != prev_block)) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               ret = PTR_ERR(p);
+                               goto out;
+                       }
+
+                       memcpy(bitmap, p, kvdtb->bitmap_size);
+                       actual_block = prev_block;
+                       first_run = 0;
+               }
+
+               if (test_bit(bitmap_offset, bitmap) == 0) {
+                       ret = -ENODEV;
+                       goto out;
+               }
+
+               if (memcmp(p + offset_in_block, key, kvs->ksize))
+                       head = (head + 1) % kvdtb->kmax;
+               else {
+                       bitmap_set(bitmap, bitmap_offset + 1, 1);
+                       memcpy(p, bitmap, kvdtb->bitmap_size);
+
+                       dm_bufio_mark_buffer_dirty(buf);
+
+                       ret = 0;
+                       goto out;
+               }
+       } while (head != tail);
+
+       ret = -ENODEV;
+
+out:
+       if (buf)
+               dm_bufio_release(buf);
+       kfree(bitmap);
+       return ret;
+}
+
+/*
+ * 0 - not found
+ * 1 - found
+ * < 0 - error on lookup
+ */
+static int kvs_lookup_sparse_disktable(struct kvstore *kvs, void *key,
+               int32_t ksize, void *value, int32_t *vsize)
+{
+       uint64_t idxhead = *((uint64_t *)key);
+       uint32_t entry_size, head, tail, offset_in_block, block_size;
+       uint32_t first_run = 1, bitmap_offset;
+       uint64_t actual_block, prev_block = 0;
+       int ret = 0;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize != kvs->ksize)
+               return -EINVAL;
+
+       entry_size = kvs->vsize + kvs->ksize;
+       head = idxhead % kvdtb->kmax;
+       tail = head;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       do {
+               actual_block = kvdtb->start_block + head /
+                       kvdtb->num_entries_per_block;
+               bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+               offset_in_block = (head % kvdtb->num_entries_per_block) *
+                       entry_size + kvdtb->bitmap_size;
+
+               if (actual_block > kvdtb->end_block) {
+                       ret = -ERANGE;
+                       goto out;
+               }
+
+               if (first_run || (actual_block != prev_block)) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               ret = PTR_ERR(p);
+                               goto out;
+                       }
+
+                       memcpy(bitmap, p, kvdtb->bitmap_size);
+                       actual_block = prev_block;
+                       first_run = 0;
+               }
+
+               if (test_bit(bitmap_offset, bitmap) == 0) {
+                       ret = 0;
+                       goto out;
+               }
+
+               if (test_bit(bitmap_offset + 1, bitmap) != 0) {
+                       head = (head + 1) % kvdtb->kmax;
+                       continue;
+               }
+
+               if (memcmp(p + offset_in_block, key, kvs->ksize))
+                       head = (head + 1) % kvdtb->kmax;
+               else {
+                       memcpy(value, p + offset_in_block + kvs->ksize,
+                                       kvs->vsize);
+
+                       *vsize = kvs->vsize;
+
+                       ret = 1;
+                       goto out;
+               }
+
+       } while (head != tail);
+
+       ret = 0;
+
+out:
+       if (buf)
+               dm_bufio_release(buf);
+       kfree(bitmap);
+       return ret;
+}
+
+static int kvs_insert_sparse_disktable(struct kvstore *kvs, void *key,
+               int32_t ksize, void *value, int32_t vsize)
+{
+       uint64_t idxhead = *((uint64_t *)key);
+       uint32_t entry_size, head, tail, offset_in_block, block_size;
+       uint32_t first_run = 1, bitmap_offset;
+       uint64_t actual_block, prev_block = 0;
+       struct dm_buffer *buf = NULL;
+       int ret = 0;
+       unsigned long *bitmap;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       if (ksize > kvs->ksize)
+               return -EINVAL;
+
+       entry_size = kvs->vsize + kvs->ksize;
+       head = idxhead % kvdtb->kmax;
+       tail = head;
+
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       do {
+               actual_block = kvdtb->start_block + head /
+                       kvdtb->num_entries_per_block;
+               bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+               offset_in_block = (head % kvdtb->num_entries_per_block) *
+                       entry_size + kvdtb->bitmap_size;
+
+               if (actual_block > kvdtb->end_block) {
+                       ret = -ERANGE;
+                       goto out;
+               }
+
+               if (first_run || (actual_block != prev_block)) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(kvdtb->md->client, actual_block, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               ret = PTR_ERR(p);
+                               goto out;
+                       }
+
+                       memcpy(bitmap, p, kvdtb->bitmap_size);
+                       prev_block = actual_block;
+                       first_run = 0;
+               }
+
+               if ((test_bit(bitmap_offset, bitmap) == 0) ||
+                               (test_bit(bitmap_offset + 1, bitmap) != 0)) {
+                       memcpy(p + offset_in_block, key, kvs->ksize);
+                       memcpy(p + offset_in_block + kvs->ksize, value,
+                                       kvs->vsize);
+
+                       bitmap_set(bitmap, bitmap_offset, 1);
+                       bitmap_clear(bitmap, bitmap_offset + 1, 1);
+                       memcpy(p, bitmap, kvdtb->bitmap_size);
+
+                       dm_bufio_mark_buffer_dirty(buf);
+
+                       ret = 0;
+                       goto out;
+               }
+
+               head = (head + 1) % kvdtb->kmax;
+
+       } while (head != tail);
+
+       ret = -ENOSPC;
+
+out:
+       if (buf)
+               dm_bufio_release(buf);
+       kfree(bitmap);
+       return ret;
+}
+
+/*
+ *
+ * NOTE: if iteration_action() is a deletion/cleanup function,
+ *      Make sure that the store is implemented such that
+ *      deletion in-place is safe while iterating.
+ */
+static int kvs_iterate_sparse_disktable(struct kvstore *kvs,
+               int (*iteration_action)(void *key, int32_t ksize,
+               void *value, int32_t vsize, void *data), void *data)
+{
+       int err = 0;
+       uint32_t entry_size, head = 0, block_size, offset_in_block;
+       uint32_t first_run = 1, bitmap_offset = 0;
+       uint64_t actual_block, prev_block = 0;
+       struct dm_buffer *buf = NULL;
+       unsigned long *bitmap;
+       void *p = NULL;
+       struct kvstore_disktable *kvdtb = NULL;
+
+       BUG_ON(!kvs);
+
+       kvdtb = container_of(kvs, struct kvstore_disktable, ckvs);
+
+       entry_size = kvs->vsize + kvs->ksize;
+       block_size = dm_bufio_get_block_size(kvdtb->md->client);
+
+       bitmap = kmalloc(kvdtb->bitmap_size, GFP_KERNEL);
+       if (!bitmap)
+               return -ENOMEM;
+
+       do {
+               actual_block = kvdtb->start_block + head /
+                       kvdtb->num_entries_per_block;
+               bitmap_offset = (head % kvdtb->num_entries_per_block) * 2;
+               offset_in_block = (head % kvdtb->num_entries_per_block) *
+                       entry_size + kvdtb->bitmap_size;
+
+               if (actual_block > kvdtb->end_block) {
+                       err = -ERANGE;
+                       goto out;
+               }
+
+               if (first_run || (actual_block != prev_block)) {
+                       if (buf && !first_run) {
+                               dm_bufio_release(buf);
+                               buf = NULL;
+                       }
+
+                       p = dm_bufio_read(kvdtb->md->client, actual_block,
+                                       &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               err = PTR_ERR(p);
+                               goto out;
+                       }
+
+                       memcpy(bitmap, p, kvdtb->bitmap_size);
+                       prev_block = actual_block;
+                       first_run = 0;
+               }
+
+               if ((test_bit(bitmap_offset, bitmap) != 0) &&
+                       (test_bit(bitmap_offset + 1, bitmap) == 0)) {
+                       err = iteration_action(p + offset_in_block,
+                                       kvs->ksize,
+                                       p + kvs->ksize + offset_in_block,
+                                       kvs->vsize, data);
+
+                       if (err < 0)
+                               goto out;
+               }
+
+               head = (head + 1) % kvdtb->kmax;
+       } while (head);
+
+out:
+       if (buf)
+               dm_bufio_release(buf);
+       kfree(bitmap);
+       return err;
+}
+
+static struct kvstore *kvs_create_sparse_disktable(struct metadata *md,
+               uint32_t ksize, uint32_t vsize, uint32_t knummax,
+               bool unformatted)
+{
+       struct kvstore_disktable *kvs;
+       uint64_t kvstore_size, metadata_dev_size, start, end, i, num_blocks;
+       uint32_t block_size, num_entries_per_block, bitmap_size;
+       struct dm_buffer *buf = NULL;
+       void *p;
+
+       if (md->create_new) {
+               if (!vsize || !ksize || !knummax)
+                       return ERR_PTR(-ENOTSUPP);
+
+               /* We do not support two or more KVSs at the moment */
+               if (md->kvs_sparse)
+                       return ERR_PTR(-EBUSY);
+       }
+
+       kvs = kmalloc(sizeof(*kvs), GFP_KERNEL);
+       if (!kvs)
+               return ERR_PTR(-ENOMEM);
+
+       block_size = dm_bufio_get_block_size(md->client);
+
+       if (!md->create_new) {
+               if (le32_to_cpu(md->disk_super->hash_pcn_vsize) != vsize) {
+                       pr_err("Value size passed does not match value size "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               if (le32_to_cpu(md->disk_super->hash_pcn_ksize) != ksize) {
+                       pr_err("Key size passed does not match key size "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               if (le32_to_cpu(md->disk_super->hash_pcn_kmax) != knummax) {
+                       pr_err("Max keys passed does not match max keys "
+                                       "stored on metadata disk");
+                       kfree(kvs);
+                       return ERR_PTR(-EINVAL);
+               }
+
+               start = le64_to_cpu(md->disk_super->hash_pcn_start);
+               end = le64_to_cpu(md->disk_super->hash_pcn_end);
+               num_entries_per_block =
+                       le32_to_cpu(md->disk_super->hash_pcn_num_entries);
+               bitmap_size =
+                       le32_to_cpu(md->disk_super->hash_pcn_bitmap_size);
+               num_blocks = (knummax / num_entries_per_block) + 1;
+               kvstore_size = num_blocks * block_size;
+       } else {
+               metadata_dev_size = dm_bufio_get_device_size(md->client);
+
+               num_entries_per_block = block_size / (vsize + ksize);
+
+               bitmap_size = (block_size - num_entries_per_block *
+                               (vsize + ksize)) * 8;
+
+               while (bitmap_size < (num_entries_per_block * 2)) {
+                       bitmap_size += (vsize + ksize) * 8;
+                       num_entries_per_block--;
+               }
+
+               /* We want the size in bytes */
+               bitmap_size = bitmap_size / 8;
+
+               num_blocks = (knummax / num_entries_per_block) + 1;
+               kvstore_size = num_blocks * block_size;
+
+               if (((kvstore_size / block_size) +
+                               md->currently_allocated_on_disk_offset) >
+                               metadata_dev_size) {
+                       pr_err("Sparse kvs store cannot be created, "
+                                       "metadata device too small");
+                       kfree(kvs);
+                       return ERR_PTR(-ENOMEM);
+               }
+
+               start = md->currently_allocated_on_disk_offset;
+               end = md->currently_allocated_on_disk_offset +
+                       (kvstore_size / block_size) + 1;
+
+               for (i = start; i < end; i++) {
+                       p = dm_bufio_new(md->client, i, &buf);
+                       if (unlikely(IS_ERR(p))) {
+                               kfree(kvs);
+                               return p;
+                       }
+
+                       memset(p, 0, block_size);
+                       dm_bufio_mark_buffer_dirty(buf);
+                       dm_bufio_release(buf);
+               }
+
+               md->disk_super->hash_pcn_vsize = cpu_to_le32(vsize);
+               md->disk_super->hash_pcn_ksize = cpu_to_le32(ksize);
+               md->disk_super->hash_pcn_kmax = cpu_to_le32(knummax);
+               md->disk_super->hash_pcn_start = cpu_to_le64(start);
+               md->disk_super->hash_pcn_end = cpu_to_le64(end);
+               md->disk_super->hash_pcn_num_entries =
+                       cpu_to_le32(num_entries_per_block);
+               md->disk_super->hash_pcn_bitmap_size =
+                       cpu_to_le32(bitmap_size);
+               md->disk_super->metadata_nr_blocks = cpu_to_le64(end + 1);
+       }
+
+       pr_info("Space required on disk for sparse key value store: "
+                       "%llu.%06llu MB\n", kvstore_size / (1024 * 1024),
+                       kvstore_size - ((kvstore_size / (1024 * 1024))
+                               * (1024 * 1024)));
+
+       kvs->ckvs.vsize = vsize;
+       kvs->ckvs.ksize = ksize;
+       kvs->kmax = knummax;
+       kvs->start_block = start;
+       kvs->end_block = end;
+       kvs->num_entries_per_block = num_entries_per_block;
+       kvs->bitmap_size = bitmap_size;
+       kvs->md = md;
+
+       kvs->ckvs.kvs_insert = kvs_insert_sparse_disktable;
+       kvs->ckvs.kvs_lookup = kvs_lookup_sparse_disktable;
+       kvs->ckvs.kvs_delete = kvs_delete_sparse_disktable;
+       kvs->ckvs.kvs_iterate = kvs_iterate_sparse_disktable;
+       md->currently_allocated_on_disk_offset =
+               le64_to_cpu(md->disk_super->metadata_nr_blocks);
+
+       md->kvs_sparse = kvs;
+
+       return &(kvs->ckvs);
+}
+
+void flush_bufio_cache_disktable(struct metadata *md)
+{
+       drop_buffers_user(md->client);
+       return;
+}
+
+struct metadata_ops metadata_ops_disktable = {
+       .init_meta = init_meta_disktable,
+       .exit_meta = exit_meta_disktable,
+       .kvs_create_linear = kvs_create_linear_disktable,
+       .kvs_create_sparse = kvs_create_sparse_disktable,
+
+       .alloc_data_block = alloc_data_block_disktable,
+       .inc_refcount = inc_refcount_disktable,
+       .dec_refcount = dec_refcount_disktable,
+       .get_refcount = get_refcount_disktable,
+
+       .flush_meta = flush_meta_disktable,
+
+       .get_private_data = get_private_data_disktable,
+       .set_private_data = set_private_data_disktable,
+
+       .flush_bufio_cache = flush_bufio_cache_disktable,
+};
diff --git a/drivers/md/dm-dedup-dtb.h b/drivers/md/dm-dedup-dtb.h
new file mode 100644 (file)
index 0000000..ab7ffcf
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2012-2013 File systems and Storage Lab (FSL)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ */
+
+#ifndef DISK_TABLE_BACKEND_H
+#define DISK_TABLE_BACKEND_H
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/device-mapper.h>
+#include <linux/dm-io.h>
+#include <linux/dm-kcopyd.h>
+#include <linux/list.h>
+#include <linux/err.h>
+#include <asm/current.h>
+#include <linux/string.h>
+#include <linux/gfp.h>
+
+#include <linux/scatterlist.h>
+#include <asm/page.h>
+#include <asm/unaligned.h>
+#include <crypto/hash.h>
+#include <crypto/md5.h>
+#include <crypto/algapi.h>
+
+#include "dm-dedup-target.h"
+
+extern struct metadata_ops metadata_ops_disktable;
+
+struct init_param_disktable {
+       struct block_device *metadata_dev;
+       uint64_t blocks;
+       int reconstruct_meta;
+};
+
+#endif /* DISK_TABLE_BACKEND_H */
index dc766c1dade894a9dfb52a986ee50b1981a46552..7570de187161cdf7a278a6a065732a44cbb8ce14 100644 (file)
@@ -18,6 +18,7 @@
 #include "dm-dedup-backend.h"
 #include "dm-dedup-ram.h"
 #include "dm-dedup-cbt.h"
+#include "dm-dedup-dtb.h"
 #include "dm-dedup-kvstore.h"
 
 #define MAX_DEV_NAME_LEN (64)
@@ -40,9 +41,17 @@ struct dedup_work {
        struct bio *bio;
 };
 
+struct mark_and_sweep_data {
+       unsigned long *bitmap;
+       uint64_t bitmap_len;
+       uint64_t cleanup_count; /* number of hashes cleaned up */
+       struct dedup_config *dc;
+};
+
 enum backend {
        BKND_INRAM,
-       BKND_COWBTREE
+       BKND_COWBTREE,
+       BKND_DISKTABLE
 };
 
 static void bio_zero_endio(struct bio *bio)
@@ -511,6 +520,8 @@ static int parse_backend(struct dedup_args *da, struct dm_arg_set *as,
                da->backend = BKND_INRAM;
        else if (!strcmp(backend, "cowbtree"))
                da->backend = BKND_COWBTREE;
+       else if (!strcmp(backend, "disktable"))
+               da->backend = BKND_DISKTABLE;
        else {
                *err = "Unsupported metadata backend";
                return -EINVAL;
@@ -593,6 +604,7 @@ static int dm_dedup_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 
        struct init_param_inram iparam_inram;
        struct init_param_cowbtree iparam_cowbtree;
+       struct init_param_disktable iparam_disktable;
        void *iparam = NULL;
        struct metadata *md = NULL;
 
@@ -666,6 +678,13 @@ static int dm_dedup_ctr(struct dm_target *ti, unsigned int argc, char **argv)
                iparam_cowbtree.blocks = dc->pblocks;
                iparam_cowbtree.metadata_bdev = da.meta_dev->bdev;
                iparam = &iparam_cowbtree;
+               break;
+       case BKND_DISKTABLE:
+               dc->mdops = &metadata_ops_disktable;
+               iparam_disktable.blocks = dc->pblocks;
+               iparam_disktable.metadata_dev = da.meta_dev->bdev;
+               iparam_disktable.reconstruct_meta = 0;  // For now we never reconstruct
+               iparam = &iparam_disktable;
        }
 
        strcpy(dc->backend_str, da.backend_str);
@@ -842,29 +861,51 @@ static void dm_dedup_status(struct dm_target *ti, status_type_t status_type,
        }
 }
 
+static int mark_lbn_pbn_bitmap(void *key, int32_t ksize,
+                              void *value, int32_t vsize, void *data)
+{
+       int ret = 0;
+       struct mark_and_sweep_data *ms_data =
+               (struct mark_and_sweep_data *)data;
+       uint64_t pbn_val = *((uint64_t *)value);
+
+       BUG_ON(!data);
+       BUG_ON(!ms_data->bitmap);
+       BUG_ON(pbn_val > ms_data->bitmap_len);
+
+       bitmap_set(ms_data->bitmap, pbn_val, 1);
+
+       return ret;
+}
+
 static int cleanup_hash_pbn(void *key, int32_t ksize, void *value,
                            int32_t vsize, void *data)
 {
        int r = 0;
        uint64_t pbn_val = 0;
+       struct mark_and_sweep_data *ms_data =
+               (struct mark_and_sweep_data *)data;
        struct hash_pbn_value hashpbn_value = *((struct hash_pbn_value *)value);
-       struct dedup_config *dc = (struct dedup_config *)data;
+       struct dedup_config *dc = ms_data->dc;
 
        BUG_ON(!data);
+       BUG_ON(!ms_data->bitmap);
 
        pbn_val = hashpbn_value.pbn;
+       BUG_ON(pbn_val > ms_data->bitmap_len);
 
-       if (dc->mdops->get_refcount(dc->bmd, pbn_val) == 1) {
+       if (test_bit(pbn_val, ms_data->bitmap) == 0) {
                r = dc->kvs_hash_pbn->kvs_delete(dc->kvs_hash_pbn,
                                                        key, ksize);
                if (r < 0)
                        goto out;
 
-               r = dc->mdops->dec_refcount(dc->bmd, pbn_val);
+               r = dc->mdops->dec_refcount(ms_data->dc->bmd, pbn_val);
                if (r < 0)
                        goto out_dec_refcount;
 
                dc->physical_block_counter -= 1;
+               ms_data->cleanup_count++;
        }
 
        goto out;
@@ -880,13 +921,48 @@ out:
 static int garbage_collect(struct dedup_config *dc)
 {
        int err = 0;
+       sector_t data_size = 0;
+       uint64_t bitmap_size = 0;
+       struct mark_and_sweep_data ms_data;
 
        BUG_ON(!dc);
 
-       /* Cleanup hashes if the refcount of block == 1 */
+       data_size = i_size_read(dc->data_dev->bdev->bd_inode) >> SECTOR_SHIFT;
+       (void) sector_div(data_size, dc->sectors_per_block);
+       bitmap_size = data_size;
+
+       memset(&ms_data, 0, sizeof(struct mark_and_sweep_data));
+
+       ms_data.bitmap = vmalloc(BITS_TO_LONGS(bitmap_size) *
+                       sizeof(unsigned long));
+       if (!ms_data.bitmap) {
+               DMERR("Could not vmalloc ms_data.bitmap");
+               err = -ENOMEM;
+               goto out;
+       }
+       bitmap_zero(ms_data.bitmap, bitmap_size);
+
+       ms_data.bitmap_len = bitmap_size;
+       ms_data.cleanup_count = 0;
+       ms_data.dc = dc;
+
+       /* Create bitmap of used pbn blocks */
+       err = dc->kvs_lbn_pbn->kvs_iterate(dc->kvs_lbn_pbn,
+                       &mark_lbn_pbn_bitmap, (void *)&ms_data);
+       if (err < 0)
+               goto out_free;
+
+       /* Cleanup hashes based on above bitmap of used pbn blocks */
        err = dc->kvs_hash_pbn->kvs_iterate(dc->kvs_hash_pbn,
-                       &cleanup_hash_pbn, (void *)dc);
+                       &cleanup_hash_pbn, (void *)&ms_data);
+       if (err < 0)
+               goto out_free;
 
+//     dc->physical_block_counter -= ms_data.cleanup_count;
+
+out_free:
+       vfree(ms_data.bitmap);
+out:
        return err;
 }