#ifndef PERSIBPLUSTREE_H
#define PERSIBPLUSTREE_H

#include "persifs.h"
#include "marshal.h"
#include "happyio.h"
#include "bplustree.h"

#include <qhash.h>
#include <list.h>

/*
 * branchT (sometimes called T in the comments) is the minimum
 * branching factor of the tree. We follow the CLRS definition (*not*
 * the Knuth definition): every internal non-root node contains
 * between T-1 and 2T-1 routing elements (inclusive), and hence
 * between T and 2T children.
 */

/**
 * Internal representation:
 *
 * Header:
 *  magic (BPLUSTREE_MAGIC_NUMBER)           4 bytes
 *  branchT                                  4 bytes
 *  <reserved>                               4 bytes
 *  last timestamp                           4 bytes
 *
 * Internal node:
 *  magic (BPLUS_INTERNAL_MAGIC_NUMBER)      4 bytes
 *  timestamp                                4 bytes
 *  n = # of elements (0 <= x <= 2*T - 1)    4 bytes
 *  marshalled keys (n strings)              variable  * n
 *  n+1 children (offsets)                   4 bytes   * n+1
 *  zero-padding
 *    To be padded to length: 12 + 8T + 2TK bytes, where
 *    K = 4 + the max length of a marshalled key
 *  Second half (modification box)
 *  magic (BPLUS_INTERNAL_MBOX_MAGIC_NUMBER) 4 bytes
 *  timestamp                                4 bytes
 *  n = # of elements (0 <= x <= 2*T - 1)    4 bytes
 *  marshalled keys (n strings)              variable  * n
 *  n+1 children (offsets)                   4 bytes   * n+1
 *  zero-padding
 *    To be padded to length: 12 + 8T + 2TK bytes, where
 *    K = 4 + the max length of a marshalled key
 *  If the mbox's timestamp is zero, the mbox is unused.
 *  
 *
 * Leaf node:
 *  magic (BPLUS_LEAF_MAGIC_NUMBER)          4 bytes
 *  marshalled key (string)                  variable
 *  marshalled value (string)                variable
 */

#define PBPT_DEBUG 0

typedef long treeTimestamp;

const unsigned long PBPLUSTREE_MAGIC_NUMBER = 0xc3427dda;
const unsigned long PBPLUS_INTERNAL_MAGIC_NUMBER = 0x8c936b58;
const unsigned long PBPLUS_INTERNAL_MBOX_MAGIC_NUMBER = 0x3850380f;
const unsigned long PBPLUS_LEAF_MAGIC_NUMBER = 0x21318ba5;

const unsigned long CACHE_MBOX = 0x80000000;



template<>
struct marshaller<treeTimestamp>
{
  marshaller() {}

  str operator() (const treeTimestamp &t) const
  {
    strWriter w;

    w.write(&t, sizeof(treeTimestamp));

    return w.getStr();
  }

  unsigned long maxLength() const
  {
    return sizeof(treeTimestamp);
  }
};

template<>
struct unmarshaller<treeTimestamp>
{
  unmarshaller() {}

  treeTimestamp operator() (str s) const
  {
    strReader r(s);

    treeTimestamp ts;
    r.read(&ts, sizeof(treeTimestamp));
    return ts;
  }
};

struct PBPlusInternalNode
{
  unsigned long numElements;
  str *keys;
  unsigned long *children;
  unsigned long branchT;
  unsigned long maxKeyLength;
  unsigned long pos;
  bool mbox;                    // True if this is the "modification box"
  treeTimestamp ts;
  bool writable;                // Should be strictly enforced, but isn't.
  tailq_entry<PBPlusInternalNode> anc_qent;
  tailq_entry<PBPlusInternalNode> cache_qent;
  
  static PBPlusInternalNode *
  create(unsigned long branchT, unsigned long maxKeyLength,
                unsigned long pos, bool mbox, treeTimestamp ts) {
    PBPlusInternalNode *n = New PBPlusInternalNode();
    n->branchT = branchT;
    n->maxKeyLength = maxKeyLength;
    n->pos = pos;
    n->numElements = 0;
    n->keys = New str[2*branchT-1];
    n->children = New unsigned long [2*branchT];
    n->mbox = mbox;
    n->ts = ts;
    return n;
  }

  // XXX Ignore the copy constructor and destructor. This makes
  // manually destructing the keys and children necessary, but for
  // some reason prevents a massive memory blowup. I can't explain
  // this, nor do I want to.
  
  ~PBPlusInternalNode() {
//     delete [] keys;
//     delete [] children;
  }

//   PBPlusInternalNode(PBPlusInternalNode & n) {
//     numElements = n.numElements;
//     branchT = n.branchT;
//     pos = n.pos;
//     mbox = n.mbox;
//     ts = n.ts;
//     keys = New str[2*branchT-1];
//     children = New unsigned long [2*branchT];

//     for (unsigned long i = 0; i < numElements; i++) {
//       keys[i] = n.keys[i];
//     }
//     for (unsigned long i = 0; i < numElements+1; i++) {
//       children[i] = n.children[i];
//     }
//     warn << "copy constructing\n";
//   }
  
    
  static PBPlusInternalNode *
  load(streamReader & r, unsigned long branchT,
              unsigned long maxKeyLength, unsigned long pos, bool mbox) {
    PBPlusInternalNode *n = New PBPlusInternalNode();
    n->branchT = branchT;
    n->maxKeyLength = maxKeyLength;
    n->pos = pos;
    n->mbox = mbox;

    if (r.readLong() != (mbox ? PBPLUS_INTERNAL_MBOX_MAGIC_NUMBER
                              : PBPLUS_INTERNAL_MAGIC_NUMBER)) {
      fatal << "PBPlusInternalNode read: bad magic number"
            << (mbox ? str(" for mbox ") : str("")) << "\n";
    }

    n->ts = r.readLong();
    n->numElements = r.readLong();
    if (r.hasError() || r.eos()) {
      fatal << "PBPlusInternalNode read: unexpected eos or error\n";
    }
    if (n->numElements > 2*branchT-1) {
      // Should also be >= branchT - 1, but the root can violate
      // this.
      fatal << "PBPlusInternalNode read: invalid number of elements "
            << n->numElements << "\n";
    }

    n->keys = New str[2*branchT-1];
    for (unsigned long i = 0; i < n->numElements; i++) {
      n->keys[i] = r.readStr();
      if (r.hasError() || r.eos()) {
        fatal << "PBPlusInternalNode read: unexpected eos or error\n";
      }
    }

    n->children = New unsigned long [2*branchT];
    for (unsigned long i = 0; i < n->numElements + 1; i++) {
      n->children[i] = r.readLong();
      if (r.hasError() || r.eos()) {
        fatal << "PBPlusInternalNode read: unexpected eos or error\n";
      }
    }
    
    return n; 
  }

  unsigned long padLen() {
    return padLen(branchT, maxKeyLength);
  }
  
  static unsigned long padLen(unsigned long t, unsigned long k) {
    return 3 * (sizeof(unsigned long))
      + 4 * t * sizeof(unsigned long)
      + 2 * t * (k + sizeof(unsigned long));
  }

  str marshal() {
    strWriter s;
    if (mbox) {
      s.writeLong(PBPLUS_INTERNAL_MBOX_MAGIC_NUMBER);
    } else {
      s.writeLong(PBPLUS_INTERNAL_MAGIC_NUMBER); 
    }
    s.writeLong(ts);
    s.writeLong(numElements);
    for (unsigned long i = 0; i < numElements; i++) {
      s.writeStr(keys[i]);
    }
    for (unsigned long i = 0; i < numElements+1; i++) {
      s.writeLong(children[i]);
    }
    if (s.hasError()) {
      fatal << "PBPlusInternalNode: marshalling error\n";
    }

    // Zero pad
    unsigned long padLength = padLen()
      - s.getStr().len();
    for(unsigned long i = 0; i < padLength; i++) {
      s.writeByte(0);
    }
    
    return s.getStr();
  }

//private:
  PBPlusInternalNode() {
    writable = false;
  }
  
};

typedef tailq<PBPlusInternalNode,
              &PBPlusInternalNode::anc_qent> ancestorQueue;


struct PBPlusLeafNode
{
  str key;
  str value;
  unsigned long pos;

  PBPlusLeafNode(streamReader & r, unsigned long branchT,
                unsigned long maxKeyLength, unsigned long pos)
    : pos(pos) {
    if (r.readLong() != PBPLUS_LEAF_MAGIC_NUMBER) {
      fatal << "PBPlusLeafNode read: bad magic number\n";
    }
    key = r.readStr();
    value = r.readStr();
    if (r.hasError() || r.eos()) {
      fatal << "PBPlusLeafNode read: unexpected eos or error\n";
    }
  }

  PBPlusLeafNode(unsigned long branchT, unsigned long maxKeyLength,
                unsigned long pos)
    : pos(pos) {
  }

  str marshal() {
    strWriter w;
    w.writeLong(PBPLUS_LEAF_MAGIC_NUMBER);
    w.writeStr(key);
    w.writeStr(value);
    if (w.hasError()) {
      fatal << "PBPlusLeafNode: marshalling error\n";
    }
    return w.getStr();
  }
};



template<class K, class V,
         class MK = marshaller<K>, class MV = marshaller<V>,
         class UMK = unmarshaller<K>, class UMV = unmarshaller<V> >
class PersiBPlusTree
{
  
public:
  static void create(str filename, str rootMapFilename,
                     unsigned long branchT, unsigned long maxCacheSize,
                     callback<void,
                     ref<PersiBPlusTree<K,V,MK,MV,UMK,UMV> > >::ref cb,
                     cbe error)  {

    BPlusTree<treeTimestamp,
              unsigned long>::create(rootMapFilename, branchT,
                                     wrap(create_after_rootMapCreate,
                                          filename, branchT, maxCacheSize,
                                          cb, error),
                                     error);
  }

  static void create_after_rootMapCreate(
    str filename, unsigned long branchT, unsigned long maxCacheSize,
    callback<void,ref<PersiBPlusTree<K,V,MK,MV,UMK,UMV> > >::ref cb,
    cbe error, ref<BPlusTree<treeTimestamp,unsigned long> > rootMap )  {

      bool initialize;
      
      if (access(filename.cstr(), F_OK) == 0) {
        // File exists
        initialize = false;
      } else {
        initialize = true;
        // XXX Should probably make sure root map doesn't exist or
        // unlink it, but...
      }


      
    FILE *f = fopen(filename.cstr(),
                    initialize ? "w+b" : "r+b");
    if (!f) {
      warn << "PersiBPlusTree: failed to open " << filename << "\n";
      error(PERR_IO);
      return;
    }

    unsigned long latestRootPos;
    
    if (initialize) {
      fileWriter w(f);
      w.writeLong(PBPLUSTREE_MAGIC_NUMBER);
      w.writeLong(branchT);
      unsigned long rootPos = ftell(f) + 2*sizeof(unsigned long);
      w.writeLong(rootPos);     // Ignored.
      w.writeLong(1);

      rootMap->insert(1, rootPos);
      latestRootPos = rootPos;
      
      // Create a new root
      // Need to have a marshaller in order to get maxLength
      MK mK;
      PBPlusInternalNode * root = PBPlusInternalNode::create(branchT,
                                                             mK.maxLength(),
                                                             ftell(f),
                                                             false, 1);
      root->numElements = 0;
      root->children[0] = 0;
      fseek(f, rootPos, SEEK_SET);
      w.writeStrWithoutLen(root->marshal());
      
      PBPlusInternalNode * rootMBox = PBPlusInternalNode::create(branchT,
                                                                 mK.maxLength(),
                                                                 ftell(f),
                                                                 true, 0);
      rootMBox->numElements = 0;
      rootMBox->children[0] = 0;
      fseek(f, rootPos + PBPlusInternalNode::padLen(branchT, mK.maxLength()),
            SEEK_SET);
#if PBPT_DEBUG
      warn << "writing root mbox at " << ftell(f) << "\n";
#endif
      w.writeStrWithoutLen(rootMBox->marshal());

      if (w.hasError() || fflush(f) || fseek(f, 0, SEEK_SET)) {
        warn << "PersiBPlusTree: failed to initialize new tree file\n";
        error(PERR_IO);
        return;
      }
      delete root;

    } else {

      latestRootPos = *rootMap->syncPredecessor(LONG_MAX);
    }
    

    cb(New refcounted<PersiBPlusTree<K,V,MK,MV,UMK,UMV> >(f, rootMap,
                                                          latestRootPos,
                                                          maxCacheSize));
  }
  
  treeTimestamp insert(const K & key, const V & val) {
    doInsert(key, val);
//    printTree();
    evictCache();
    return getTS();
  }
  
  void search(const K & key, treeTimestamp ts,
              callback<void, K, V>::ref cb, cbe error) {
    doSearch(getRootPos(ts), ts, key, cb, error, false);
    evictCache();
  }

  ptr<V> syncSearch(const K & key, treeTimestamp ts) {
    ptr<V> r = doSearch(getRootPos(ts), ts, key, NULL, NULL, false);
    evictCache();
    return r;
  }


  void predecessor(const K & key, treeTimestamp ts,
                   callback<void, K, V>::ref cb, cbe error) {
    doSearch(getRootPos(ts), ts, key, cb, error, true);
    evictCache();
  }

  ptr<V> syncPredecessor(const K & key, treeTimestamp ts) {
    ptr<V> r = doSearch(getRootPos(ts), ts, key, NULL, NULL, true);
    evictCache();
    return r;
  }

  
  treeTimestamp getTS() {
    return lastTS;
  }

  treeTimestamp commit() {
    // Could also be called flush...
#if PBPT_DEBUG
    warn << "committing ts=" << lastTS << "\n";
#endif
    
    nodeCache.traverse(wrap(this, &PersiBPlusTree::commitCacheEntry));

    if (latestRootPosDirty) {
      rootMap->insert(lastTS, latestRootPos);
      latestRootPosDirty = false;
    }

    fseek(f, 12, SEEK_SET);
    fw.writeLong(lastTS+1);
    
    // Do cache eviction
    evictCache();
    
    return lastTS++;
  }
  
  void printTree() {
    printSubtree(getRootPos(getTS()));
  }

  unsigned long blockTransfers() {
    return blockTransferCount;
  }

  void resetBlockTransfers() {
    blockTransferCount = 0;
  }
  
  
protected:
  const MK marshK;
  const MV marshV;
  const UMK unmarshK;
  const UMV unmarshV;
  unsigned long branchT;        // branching factor
  FILE *f;
  fileReader fr;
  fileWriter fw;
  treeTimestamp lastTS;
  qhash<unsigned long, PBPlusInternalNode> nodeCache;
  ref<BPlusTree<treeTimestamp, unsigned long> > rootMap;
  unsigned long latestRootPos;
  bool latestRootPosDirty;
  unsigned long blockTransferCount;
  tailq<PBPlusInternalNode,
        &PBPlusInternalNode::cache_qent> cacheReplacementList;
  unsigned long cacheSize;
  unsigned long maxCacheSize;
  
  
  PersiBPlusTree (FILE * f, ref<BPlusTree<treeTimestamp,
                                          unsigned long> > rootMap,
                  unsigned long latestRootPos,
                  unsigned long maxCacheSize)
    : marshK(MK()), marshV(MV()), unmarshK(UMK()), unmarshV(UMV()),
      f(f), fr(f), fw(f), rootMap(rootMap), latestRootPos(latestRootPos),
      latestRootPosDirty(false), blockTransferCount(0), cacheSize(0),
      maxCacheSize(maxCacheSize)
    {
    // Read header
    fseek(f, 0, SEEK_SET);
    
    if (fr.readLong() != PBPLUSTREE_MAGIC_NUMBER) {
      fatal << "PersiBPlusTree file invalid -- bad header magic\n";
    }
    branchT = fr.readLong();
    unsigned long ignore = fr.readLong();
    lastTS = fr.readLong();

#if PBPT_DEBUG
    warn << "Loaded PersiBPlusTree header: branchT=" << branchT << "\n";
#endif
    
    PBPlusInternalNode * root = getInternalNode(getRootPos(), lastTS);
#if PBPT_DEBUG
    warn << "Loaded PersiBPlusTree; root@" << getRootPos()
         << " with " << root->numElements << " children\n";
#endif
    warn << "PersiBPlusTree: node size=" << 2*padLen() << "\n";
  }

  unsigned long getRootPos() {
    return getRootPos(lastTS);
  }
  
  unsigned long getRootPos(treeTimestamp ts) {
    if (ts == lastTS) {
      return latestRootPos;
    } else {
      return *rootMap->syncPredecessor(ts);
    }
  }

  bool nodeIsInternal(unsigned long pos) {
    // Check if it's in the cache first.
    if (nodeCache[pos] != NULL) {
      return true;
    }
    
    fseek(f, pos, SEEK_SET);
    unsigned long magic = fr.readLong();
    if (fr.hasError() || fr.eos()) {
      fatal << "nodeIsInternal: read error/eos\n";
    }
    if (magic == PBPLUS_INTERNAL_MAGIC_NUMBER) {
      return true;
    } else if (magic == PBPLUS_LEAF_MAGIC_NUMBER) {
      return false;
    } else {
      fatal << "nodeIsInternal: magic is neither internal nor leaf\n";
    }
  }

  unsigned long padLen() {
    return PBPlusInternalNode::padLen(branchT, marshK.maxLength());
  }
  
  PBPlusInternalNode * getInternalNode(unsigned long pos, treeTimestamp ts,
                                       bool forceMBox = false) {
    if (nodeCache[pos] == NULL) {
      fseek(f, pos, SEEK_SET);
      PBPlusInternalNode *n = PBPlusInternalNode::load(fr, branchT,
                                                       marshK.maxLength(),
                                                       pos,
                                                       false);
      nodeCache.insert(pos, *n);
      blockTransferCount++;
      cacheReplacementList.insert_tail(nodeCache[pos]);
      delete n;
    }

    if (nodeCache[pos | CACHE_MBOX] == NULL) {
      fseek(f, pos + padLen(), SEEK_SET);
      PBPlusInternalNode *n = PBPlusInternalNode::load(fr, branchT,
                                                       marshK.maxLength(),
                                                       pos,
                                                       true);
      nodeCache.insert(pos | CACHE_MBOX, *n);
      blockTransferCount++;
      cacheReplacementList.insert_tail(nodeCache[pos | CACHE_MBOX]);
      delete n;
    }

    cacheReplacementList.remove(nodeCache[pos]);
    cacheReplacementList.insert_tail(nodeCache[pos]);
    cacheReplacementList.remove(nodeCache[pos | CACHE_MBOX]);
    cacheReplacementList.insert_tail(nodeCache[pos | CACHE_MBOX]);
    
    PBPlusInternalNode *nBox = nodeCache[pos | CACHE_MBOX];
    if (forceMBox || (nBox->ts != 0 && nBox->ts <= ts)) {
      return nBox;
    } else {
      return nodeCache[pos];
    }
    
  }

  void putInternalNode(PBPlusInternalNode * n) {
    blockTransferCount++;
    if (n->mbox) {
      fseek(f, n->pos + padLen(), SEEK_SET);
    } else {
      fseek(f, n->pos, SEEK_SET);
    }
    
#if PBPT_DEBUG
    warn << "writing internal node at " << ftell(f)
         << "  " << n->numElements << " elements\n";
#endif
    fw.writeStrWithoutLen(n->marshal());
    if (fw.hasError() || fflush(f) || fseek(f, 0, SEEK_SET)) {
      fatal << "PersiBPlusTree: failed to put internal node\n";
      return;
    }
  }
  
  PBPlusLeafNode getLeafNode(unsigned long pos) {
    fseek(f, pos, SEEK_SET);
//    blockTransferCount++;
    return PBPlusLeafNode(fr, branchT, marshK.maxLength(), pos);
  }

  void putLeafNode(PBPlusLeafNode & n) {
    blockTransferCount++;
    fseek(f, n.pos, SEEK_SET);
#if PBPT_DEBUG
    warn << "writing leaf node at " << ftell(f)
         << " key=" << n.key << " value= " << n.value << "\n";
#endif
    fw.writeStrWithoutLen(n.marshal());
    if (fw.hasError() || fflush(f) || fseek(f, 0, SEEK_SET)) {
      fatal << "PersiBPlusTree: failed to put leaf node\n";
      return;
    }
  }

  PBPlusInternalNode * allocateInternalNode(treeTimestamp ts) {
    fseek(f, 0, SEEK_END);
    unsigned long pos = ftell(f);
#if PBPT_DEBUG
    warn << "allocating internal node at " << pos
         << " mbox at " << pos+padLen() << "\n";
#endif
    PBPlusInternalNode * r = PBPlusInternalNode::create(
      branchT, marshK.maxLength(), pos, false, ts);
    r->writable = true;
    putInternalNode(r);
    nodeCache.insert(r->pos, *r);
    cacheReplacementList.insert_tail(nodeCache[r->pos]);
    delete r;

    fseek(f, pos+padLen(), SEEK_SET);
    r = PBPlusInternalNode::create(branchT, marshK.maxLength(), pos, true, 0);
    putInternalNode(r);
    nodeCache.insert(r->pos | CACHE_MBOX, *r);
    cacheReplacementList.insert_tail(nodeCache[r->pos | CACHE_MBOX]);
    delete r;

    return nodeCache[pos];
  }

  PBPlusLeafNode createLeafNode(K key, V val) {
    fseek(f, 0, SEEK_END);
    PBPlusLeafNode r(branchT, marshK.maxLength(), ftell(f));
    r.key = marshK(key);
    r.value = marshV(val);
#if PBPT_DEBUG
    warn << "created leaf node " << key << ":"  << val
         << " @ " << r.pos << "\n";
#endif
    putLeafNode(r);
    return r;
  }

  void setRoot(PBPlusInternalNode * r) {
#if PBPT_DEBUG
    warn << "moving root to " << r->pos << "\n";
#endif
    latestRootPos = r->pos;
    latestRootPosDirty = true;
  }

  /**
   * Ensure that an internal node is modifiable, performing
   * switches-to-modification-boxes or path-copying as necessary in
   * accordance with the modified Sleator-Tarjan persistence scheme.
   *
   * Precondition: ancestors is a tailq of the path to n -- the head
   * is the immediate parent of n; the tail is the root.
   *
   * Postcondition: ancestors continues to meet the precondition,
   * though some of the pointers may need to be changed if parents
   * need to be made writable as a result of path copying.
   */
  PBPlusInternalNode *
  makeWritable(PBPlusInternalNode *n,
               ref<ancestorQueue > ancestors) 
    {
      if (n->writable) {
        return n;
      }

      if (n->ts == getTS()) {
        // Already at the current timestamp, can just make it
        // writable. Actually, I don't think this should happen...
        n->writable = true;
        return n;
      }

      if (!n->mbox) {
#if PBPT_DEBUG
        warn << "Switching to mbox in node at " << n->pos << "\n";
#endif
        // Not already in a mbox, so we can go ahead and grab the
        // mbox half of the block...
        PBPlusInternalNode *m = getInternalNode(n->pos, getTS(), true);
        if (m->ts != 0) {
          fatal << "mbox is not available!\n";
        }
        m->ts = getTS();
        m->writable = true;
        m->numElements = n->numElements;
        unsigned long i;
        for (i = 0; i < n->numElements; i++) {
          m->keys[i] = n->keys[i];
          m->children[i] = n->children[i];
        }
        m->children[i] = n->children[i];

        return m;
        
      } else {
#if PBPT_DEBUG
        warn << "Path-copying node at " << n->pos << "\n";
#endif
        // mbox is already taken, so we need to make a new copy...
        PBPlusInternalNode *m = allocateInternalNode(getTS());
        m->writable = true;
        m->numElements = n->numElements;
        unsigned long i;
        for (i = 0; i < n->numElements; i++) {
          m->keys[i] = n->keys[i];
          m->children[i] = n->children[i];
        }
        m->children[i] = n->children[i];

        // Now update the parent to point to that copy...
        PBPlusInternalNode *parent = ancestors->first;
        if (parent != NULL) {
          ancestors->remove(parent);
          parent = makeWritable(parent, ancestors);
          for (i = 0; i < parent->numElements+1; i++) {
            if (parent->children[i] == n->pos) {
              parent->children[i] = m->pos;
            }
            ancestors->insert_head(parent);
          }
        } else {
          // There is no parent, i.e. n is the root.
#if PBPT_DEBUG
          warn << "Path copy changed root\n";
#endif
          setRoot(m);
        }
        return m;
      }
    }
  

  
  /**
   * Split a full child of a non-full node.
   * Preconditions:
   *   y is the ith child of x
   *   y is full (2T-1) elements; x is not full
   *   z is a newly-allocated empty node
   *   ancestors is an ancestorQueue starting at x's parent
   * Postcondition:
   *   z adopts the t largest children of y, and becomes the new i+1th
   *    child of x
   *   ancestors is updated, and still starts at x's parent
   *
   * Ref: CLRS B-Tree-Split-Child, p. 444
   */
  void splitChild(PBPlusInternalNode * &x, unsigned long i,
                  PBPlusInternalNode * &y, PBPlusInternalNode * &z,
                  ref<ancestorQueue> ancestors)
    {
      if (x->numElements == 2 * branchT-1 ||
          y->numElements != 2 * branchT-1 ||
          z->numElements != 0) {
            fatal << "splitChild: numElements precondition violated\n";
          }
      if (x->children[i] != y->pos) {
        fatal<< "splitChild: x[i]=y precondition violated\n";
      }

      x = makeWritable(x, ancestors);
      ancestors->insert_head(x);
      y = makeWritable(y, ancestors);
      ancestors->remove(x);
      
      z->numElements = branchT-1;
      
      for (unsigned long j = 0; j < branchT - 1; j++) {
        z->keys[j] = y->keys[branchT + j];
      }
      
      for (unsigned long j = 0; j < branchT; j++) {
        z->children[j] = y->children[branchT + j];
      }
      
      y->numElements = branchT-1;

      for (unsigned long j = x->numElements; j > i; j--) {
        x->children[j+1] = x->children[j];
      }
      
      x->children[i+1] = z->pos;
      
      if (x->numElements != 0) {
        for (long j = x->numElements-1; j >= (long) i; j--) {
          x->keys[j+1] = x->keys[j];
        }
      }
      
      
      x->keys[i] = y->keys[branchT-1];

      x->numElements++;
//       putInternalNode(x);
//       putInternalNode(y);
//       putInternalNode(z);
    }

  /**
   * Insert key:val into the tree.
   *
   * Ref: CLRS B-Tree-Insert, p. 445
   */
  void doInsert(K key, V val) {
    ref<ancestorQueue> ancestors = New refcounted<ancestorQueue>();
    
    //TRACE();
    PBPlusInternalNode * r = getInternalNode(getRootPos(), getTS());
    if (r->numElements == 2*branchT-1) {
      //TRACE();
      // Root is full, split it
      r = makeWritable(r, ancestors);
      PBPlusInternalNode * s = allocateInternalNode(getTS());
      s->children[0] = r->pos;
      PBPlusInternalNode * z = allocateInternalNode(getTS());
      setRoot(s);
      splitChild(s, 0, r, z, ancestors);
      insertNonFull(s, key, val, ancestors);
    } else {
      insertNonFull(r, key, val, ancestors);
    }
  }

  /**
   * Insert key:val into the tree rooted at x. Assumes that x is
   * non-full.
   */
  void insertNonFull(PBPlusInternalNode * x, K key, V val,
                     ref<ancestorQueue> ancestors) {
    //TRACE();
                       
    x = makeWritable(x, ancestors);
    
    if (x->numElements == 0 && x->children[0] == 0) {
      //TRACE();
      // Empty subtree (should only happen on root of empty tree)
      PBPlusLeafNode l = createLeafNode(key, val);
      x->children[0] = l.pos;
//      putInternalNode(x);
      
    } else if (nodeIsInternal(x->children[0])) {

      //TRACE();
      // Children aren't leaves
      long i;
      for (i = x->numElements-1; i >= 0; i--) {
        if (unmarshK(x->keys[i]) <= key)
          break;
      }
      i++;
      // x.keys[i] is the smallest elt > key

      PBPlusInternalNode * c = getInternalNode(x->children[i], getTS());

      if (c->numElements == 2*branchT-1) {
        //TRACE();
        PBPlusInternalNode * z = allocateInternalNode(getTS());
        splitChild(x, i, c, z, ancestors);
        ancestors->insert_head(x);
        if (key >= unmarshK(x->keys[i])) {
          // Move to right side of the new split
          i++;
          insertNonFull(z, key, val, ancestors);
        } else {
          insertNonFull(c, key, val, ancestors);
        } 
      } else {
        ancestors->insert_head(x);
        insertNonFull(c, key, val, ancestors);
      }
      
    } else {
      
      //TRACE();
      // Children are leaves, and at least one already exists
      long i;
      // Unless we're inserting at the far left, we have to
      // special-case this

      K farLeftChildKey = unmarshK(getLeafNode(x->children[0]).key);
      if (key == farLeftChildKey) {
        PBPlusLeafNode l = createLeafNode(key, val);
        x->children[0] = l.pos;
      }
      
      if (key < farLeftChildKey) {
        for (i = x->numElements-1; i >= 0; i--) {
          x->keys[i+1] = x->keys[i];
          x->children[i+2] = x->children[i+1];
        }
        //TRACE();
        PBPlusLeafNode ol = getLeafNode(x->children[0]);
        PBPlusLeafNode nl = createLeafNode(key, val);
        x->keys[0] = ol.key;
        x->children[1] = ol.pos;
        x->children[0] = nl.pos;
        x->numElements++;
//        putInternalNode(x);
      } else {
        // Check whether this key already exists, and if so change its
        // value
        for (unsigned long j = 0; j < x->numElements; j++) {
          if (unmarshK(x->keys[j]) == key) {
            PBPlusLeafNode l = createLeafNode(key, val);
            x->children[j+1] = l.pos;
            return;
          }
        }

        // Otherwise, insert it
        
        for (i = x->numElements-1; i >= 0; i--) {
          if (unmarshK(x->keys[i]) < key) {
            break;
          }
          x->keys[i+1] = x->keys[i];
          x->children[i+2] = x->children[i+1];
        }
        //TRACE();
        x->keys[i+1] = marshK(key);
        PBPlusLeafNode l = createLeafNode(key, val);
        x->children[i+2] = l.pos;
        x->numElements++;
//        putInternalNode(x);
      }
      
    }
  }

  ptr<V>
  doSearch(unsigned int pos, treeTimestamp ts, K k,
           callback<void,K,V>::ptr cb,
           callback<void,pStat>::ptr error,
           bool predecessorOK) {
#if PBPT_DEBUG
    warn << "doSearch: pos=" << pos << "\n";
#endif
    if (nodeIsInternal(pos)) {
      PBPlusInternalNode * x = getInternalNode(pos, ts);
      
      unsigned long i;
      if (x->numElements == 0) {
        if (x->children[0] == 0) {
          // empty tree case
          if (error) {
            error(PERR_NOENT);
          }
          return NULL;
        } else {
          i = 0;
        }
      } else if (k < unmarshK(x->keys[0])) {
        i = 0;
      } else {
        for (i = 0; i < x->numElements; i++) {
          if (k < unmarshK(x->keys[i])) {
//            warn << "k=" << k << " <= x.keys[i]=" << unmarshK(x.keys[i]) << "\n";
            break;
          }
        }
      }
//      warn << "i = " << i << "\n";
      return doSearch(x->children[i], ts, k, cb, error, predecessorOK);
    } else {
      // Leaf node
      PBPlusLeafNode x = getLeafNode(pos);
#if PBPT_DEBUG
      warn << "doSearch: reached leaf key=" << unmarshK(x.key) << "\n";
#endif
      if (unmarshK(x.key) == k || (predecessorOK && (unmarshK(x.key) <= k)))
      {
        K rKey = unmarshK(x.key);
        V rVal = unmarshV(x.value);
        if (cb) {
          cb(rKey, rVal);
        }
        return New refcounted<V>(rVal);
      } else {
        if (error) {
            error(PERR_NOENT);
        }
        return NULL;
      }
    } 
  }


  void printSubtree(unsigned int pos) {
    warn << "----------------------------------------\n";
    if (nodeIsInternal(pos)) {
      warn << "Internal node at " << pos << "\n";
      PBPlusInternalNode * n = getInternalNode(pos, getTS());
      warn << "Contains " << n->numElements << " elements\n";
      for (unsigned long i = 0; i < n->numElements; i++) {
        warn << "  " << i << ": " << unmarshK(n->keys[i]) << "\n";
      }
      for (unsigned long i = 0; i <= n->numElements; i++) {
        warn << "Node " << pos << " child #" << i << ": " << n->children[i]
             << "\n";
        printSubtree(n->children[i]);
      }
    } else {
      warn << "LEAF node at " << pos << "\n";
      PBPlusLeafNode l = getLeafNode(pos);
      warn << "key=" << unmarshK(l.key) << "\n";
      warn << "val=" << unmarshV(l.value) << "\n";
    }
    warn << "----------------------------------------\n";
  }

  void commitCacheEntry(const unsigned long & pos,
                        PBPlusInternalNode *n)
    {
#if PBPT_DEBUG
      warn << "considering cache commit for " << (long) pos;
#endif
      if (n->writable) {
#if PBPT_DEBUG
        warn << "... committing\n";
#endif
        putInternalNode(n);
        n->writable = false;
      } else {
#if PBPT_DEBUG
        warn << "\n";
#endif
      }
      
    }

  void evictCache() {
    while (nodeCache.size() > maxCacheSize) {
      PBPlusInternalNode *n = cacheReplacementList.first;
#if PBPT_DEBUG
      warn << "evicting node at pos=" << n->pos << "\n";
#endif
      if (n->writable) {
#if PBPT_DEBUG
        warn << "flushing node at pos=" << n->pos << "\n";
#endif
        putInternalNode(n);
      }
      
      delete [] n->keys;
      delete [] n->children;
      nodeCache.remove(n->pos | (n->mbox ? CACHE_MBOX : 0));
      cacheReplacementList.remove(n);
      delete n;
    }
  }
};


#endif
