/*
 * Superblob module
 *
 * This module is responsible for maintaining the superblob and
 * ensuring that multiple entries with the same content do not consume
 * more than the space of one copy. Essentially, this implements
 * content addressable storage.
 */

#ifndef SUPERBLOB_H
#define SUPERBLOB_H

#include "persifs.h"
#include "marshal.h"
#include "happyio.h"
#include <sha1.h>
#include <stdio.h>

const unsigned int BLOCK_FINGERPRINT_MAGIC_NUMBER = 0x3f7628d3;
const unsigned int BLOCK_ADDRESS_MAGIC_NUMBER = 0x3d83d26a;

typedef u_int8_t sha1Fingerprint[sha1::hashsize];

class blockFingerprint;
const strbuf & strbuf_cat(const strbuf &sb, const blockFingerprint &bf);


// The two methods for addressing blocks.
typedef unsigned long blockAddress;
// Block fingerprints are a sha1Fingerprint swaddled inside a struct,
// because libasync's callback system seems to implode when passed an
// array as an argument. But this means we have to define all kinds of
// comparator functionality for it.
class blockFingerprint
{
public:
  sha1Fingerprint fp;

  blockFingerprint() {
    for (int i = 0; i < sha1::hashsize; i++) {
      fp[i] = 0;
    }
  }

  blockFingerprint(str s) {
    strReader sr(s);
    if (sr.readLong() != BLOCK_FINGERPRINT_MAGIC_NUMBER) {
      fatal << "blockFingerprint: could not unmarshal -- bad magic\n";
    }
    for (int i = 0; i < sha1::hashsize; i++) {
      fp[i] = sr.readByte();
    }
    if (sr.eos()) {
      fatal << "blockFingerprint: could not unmarshal -- premature EOS\n";
    }
  }

  inline bool
  operator==(const blockFingerprint & b) const {
    for (int i = 0; i < sha1::hashsize; i++) {
      if (fp[i] != b.fp[i]) {
        return false;
      }
    }
    return true;
  }
  inline bool
  operator!=(const blockFingerprint & b) const {
    return (!(*this == b));
  }
  inline bool
  operator<(const blockFingerprint & b) const {
    for (int i = sha1::hashsize - 1;  i >= 0; i--) {
      if (fp[i] < b.fp[i]) {
//         warn << fp[i] << " < " << b.fp[i] << "\n";
//         warn << "compare: " << *this << " < " << b << "\n";
        return true;
      } else if (fp[i] > b.fp[i]) {
        return false;
      }
    }
//    warn << "compare: " << *this << " NOT < " << b << "\n";
    return false;
  }
  inline bool
  operator<=(const blockFingerprint & b) const {
    return (*this < b || *this == b);
  }
  inline bool
  operator>(const blockFingerprint & b) const {
    return (!(*this <= b));
  }
  inline bool
  operator>=(const blockFingerprint & b) const {
    return (!(*this < b));
  }
};

class blobIndex;

class superblob
{
public:
  static unsigned long blobEntryCount;
  static unsigned long blobSize;
  static unsigned long chunkFusionHits;
  /**
   * Create an empty superblob if filename does not exist.  Otherwise,
   * use the file in filename for the superblob data.  This is a
   * factory function.
   */
//   static void create(str filename, ref<blobIndex> bi,
//                      callback<void, ref<superblob> >::ref cb, cbe error);

  /**
   * Put new blocks into the superblob (coalescing with existing
   * blocks if possible).
   */
  virtual void put(str blobContent,
                   callback<void, blockFingerprint, blockAddress>::ref cb,
                   cbe error) = 0;

  /**
   * Get a block from the blob by fingerprint.
   */
  virtual void get(blockFingerprint fingerprint,
                   callback<void, str, blockAddress>::ref cb,
                   cbe error) = 0;

  /**
   * Get a block directly from the blob by its address.  This does not
   * require a fingerprint lookup and is therefore faster.
   */
  virtual void getDirect(blockAddress address,
                         callback<void, str, blockAddress>::ref cb,
                         cbe error) = 0;
};

/**
 * Allow block fingerprints to be pretty-printed into strbufs.
 */
inline const strbuf &
strbuf_cat(const strbuf &sb, const blockFingerprint &bf)
{
  sb << "block:";
  for (int i = sha1::hashsize - 1;  i >= 0; i--) {
    sb << hexdump(&bf.fp[i], sizeof(u_int8_t));
  }
  return sb;
}

/**
 * And allow them to be hashable in a qhash/ihash.
 */
template<>
struct hashfn<blockFingerprint> 
{
  hashfn() {}
  hash_t operator() (const blockFingerprint &b) const {
    return hash_bytes(&b.fp, sizeof(b.fp));
  }
};

/**
 * And even marshalled!
 */
template<>
struct marshaller<blockFingerprint>
{
  marshaller() { }
  str operator()(const blockFingerprint &b) const {
    strWriter sw;
    sw.writeLong(BLOCK_FINGERPRINT_MAGIC_NUMBER);
    for (int i = 0; i < sha1::hashsize; i++) {
      sw.writeByte(b.fp[i]);
    }
    return sw.getStr();
  }
  unsigned long maxLength() const {
    return ((sha1::hashsize + 1) * sizeof(u_int8_t));
  }
};


/**
 * And blockAddresses need to be both marshalled and unmarshalled.
 */
template<>
struct marshaller<blockAddress>
{
  marshaller() { }
  str operator()(const blockAddress &b) const {
    strWriter sw;
    sw.writeLong(BLOCK_ADDRESS_MAGIC_NUMBER);
    sw.writeLong(b);
    return sw.getStr();
  }
  unsigned long maxLength() const {
    return (2 * sizeof(unsigned long));
  }
};

template<>
struct unmarshaller<blockAddress>
{
  unmarshaller() { }
  blockAddress operator()(str s) const {
    strReader sr(s);
    if (sr.readLong() != BLOCK_ADDRESS_MAGIC_NUMBER) {
      fatal << "blockAddress: could not unmarshal -- bad magic\n";
    }
    blockAddress a = sr.readLong();
    if (sr.eos()) {
      fatal << "blockAddress: could not unmarshal -- premature EOS\n";
    }
    return a;
  }
};



#endif // SUPERBLOB_H
