#include "inodelog.h"
#include "happyio.h"
#include <async.h>

// Format of inode log file
//  magic (MAGIC_NUMBER)   4 bytes
//  entry*
//
// All entries
//  length                 4 bytes   (includes whole header)
//  entry type             1 byte
//  timestamp              sizeof(timestamp)
//  payload                variable
//  last snapshot offset   4 bytes   (this snapshot if this is a snapshot)
//
// Inode modify payload (entry type 1)
//  inumber                sizeof(inumber)
//  blockAddress           sizeof(blockAddress)
//
// Inode delete payload (entry type 2)
//  inumber                sizeof(inumber)
//
// Inode map snapshot payload (entry type 3)
//  prev. snapshot offset  4 bytes   (0 if this is the first)
//  next inode number      sizeof(inumber)
//  (inumber, blockAddress)* (sizeof(inumber)+sizeof(blockAddress))*n

static const unsigned long MAGIC_NUMBER = 0x474f4c49;

enum entryTypes
{
  ENTRY_MODIFY = 1,
  ENTRY_DELETE = 2,
  ENTRY_SNAPSHOT = 3,
};

//
// inodeLog
//

void
inodeLog::create(str filename, ref<superblob> blob,
                 callback<void, ref<inodeLog> >::ref cb, cbe error)
{
  char *mode;
  FILE *fp;
  bool initialize;

  if (access(filename, F_OK) == 0) {
    // File exists, so load it
    initialize = false;
    mode = "r+b";
  } else {
    // File doesn't exist, so create it
    initialize = true;
    mode = "w+b";
  }

  // Open the file
  fp = fopen(filename.cstr(), mode);
  if (!fp) {
    error(PERR_IO);
    return;
  }

  // Do I need to initialize the contents?
  if (initialize) {
    fileWriter w(fp);
    // Put a header on the file
    w.writeLong(MAGIC_NUMBER);
    // Insert an empty inode map snapshot at the beginning of time
    w.writeLong(sizeof(long)+sizeof(char)+sizeof(timestamp)+0+sizeof(long));
    w.writeByte(ENTRY_SNAPSHOT);
    timestamp ts = (timestamp)0;
    w.write(&ts, sizeof(timestamp));
    w.writeLong(0);
    inumber num = (inumber)0;
    w.write(&num, sizeof(inumber));
    w.writeLong(0);
    if (w.hasError() || fflush(fp) || fseek(fp, 0, SEEK_SET)) {
      error(PERR_IO);
      return;
    }
  }

  cb(New refcounted<inodeLog>(fp, blob));
}

inodeLog::inodeLog(FILE *fp, ref<superblob> blob)
  : fp(fp), blob(blob), currentMap(fp), snapshots(fp)
{
  // Confirm header
  fileReader r(fp);
  if (r.readLong() != MAGIC_NUMBER)
    fatal << "Incorrect magic number in inode log file\n";

  // Find location of the most recent snapshot
  fseek(fp, -sizeof(long), SEEK_END);
  long lastSnapshot = r.readLong();

  // Load the most recent snapshot and bring the world up to speed
  currentMap.replayFromSnapshot(lastSnapshot, TIMESTAMP_LATEST);

  // Find locations of all past snapshots
  snapshots.load(lastSnapshot);

  // Start the snapshot timer
  scheduleNextSnapshot();
}

inodeLog::~inodeLog()
{
  fclose(fp);
}

timestamp
inodeLog::now()
{
  return time(NULL);
}

void
inodeLog::put(inode node,
              callback<void, timestamp>::ref cb, cbe error)
{
  // Serialize the inode
  node.marshall(wrap(this, &inodeLog::put_afterMarshall, node, cb, error),
                error);
}

void
inodeLog::put_afterMarshall(inode node,
                            callback<void, timestamp>::ref cb, cbe error,
                            inode::marshalled m)
{
  // Put the inode in the superblob
  blob->put(m, wrap(this, &inodeLog::put_afterBlobPut, node, cb, error),
            error);
}

void
inodeLog::put_afterBlobPut(inode node,
                           callback<void, timestamp>::ref cb, cbe error,
                           blockFingerprint fingerprint,
                           blockAddress address)
{
  timestamp ts = now();

  // Log the inode modification
  currentMap.modify(ts, node.num, address);
  if (ferror(fp))
    error(PERR_IO);
  else
    cb(ts);
}

void
inodeLog::del(inumber num,
              callback<void, timestamp>::ref cb, cbe error)
{
  timestamp ts = now();

  // Log the inode deletion
  currentMap.remove(ts, num);
  if (ferror(fp))
    error(PERR_IO);
  else
    cb(ts);
}

void
inodeLog::get(timestamp ts, inumber num,
              callback<void, inode>::ref cb, cbe error)
{
  blockAddress ba;

  if (ts == TIMESTAMP_LATEST) {
    // Find the address of the inode from the current inode map
    ba = currentMap.get(num);
  } else {
    // Find the best snapshot to start from
    unsigned long snapshotOffset = snapshots.findSnapshot(ts);
    inodeMap pastMap(fp);
    pastMap.replayFromSnapshot(snapshotOffset, ts);
    ba = pastMap.get(num);
  }

  // Check if the inumber lookup failed
  if (ba == (blockAddress)(-1))
    error(PERR_NOENT);
  else
    // Load the inode from the blob
    blob->getDirect(ba, wrap(this, &inodeLog::get_afterBlobGet, cb, error),
                    error);
}

void
inodeLog::get_afterBlobGet(callback<void, inode>::ref cb, cbe error,
                           str content, blockAddress ba)
{
  inode::unmarshall(content, cb, error);
}

void
inodeLog::newInumber(callback<void, inumber>::ref cb, cbe error)
{
  cb(currentMap.newInumber());
}

void
inodeLog::scheduleNextSnapshot()
{
  delaycb(SNAPSHOT_TIME, wrap(this, &inodeLog::maybeTakeSnapshot));
}

void
inodeLog::maybeTakeSnapshot()
{
  if (currentMap.getModificationCount() >= SNAPSHOT_MIN_CHANGES)
    takeSnapshot();
  scheduleNextSnapshot();
}

void
inodeLog::takeSnapshot()
{
  timestamp ts = now();
  unsigned long offset = currentMap.writeSnapshot(ts);
  snapshots.add(ts, offset);
}

//
// inodeMap
//

inodeMap::inodeMap(FILE *fp)
  : ts(0), fp(fp), latest(0), lastSnapshot(0), modifications(0)
{
}

void
inodeMap::clear() 
{
  imap.clear();
  modifications = 0;
}

void
inodeMap::replayFromSnapshot(unsigned long snapshotOffset, timestamp to)
{
  clear();
  fseek(fp, lastSnapshot, SEEK_SET);
  readSnapshot();
  replay(TIMESTAMP_LATEST);
}

void
inodeMap::readSnapshot()
{
  fileReader r(fp);

  lastSnapshot = ftell(fp);

  // Read header
  long length = r.readLong();
  entryTypes type = (entryTypes)r.readByte();
  r.read(&ts, sizeof(timestamp));
  if (r.hasError())
    fatal << "Failed to read inode log snapshot entry header\n";
  latest = ts;

  // Make sure this is a snapshot
  if (type != ENTRY_SNAPSHOT)
    fatal << "Incorrect inode log entry type; excepted snapshot\n";

  // Read (and discard) previous snapshot offset
  r.readLong();
  
  // Read next inumber
  r.read(&nextInumber, sizeof(inumber));

  // How many inodes are in this snapshot?
  int ninodes = (length - (sizeof(long) + sizeof(char) + sizeof(timestamp) +
                           sizeof(long) + sizeof(inumber) +
                           sizeof(long))) /
    (sizeof(inumber) + sizeof(blockAddress));

  for (int i = 0; i < ninodes; ++i) {
    inumber num;
    blockAddress ba;

    r.read(&num, sizeof(inumber));
    r.read(&ba, sizeof(blockAddress));

    if (r.hasError())
      fatal << "Failed to read inode log snapshot entry\n";
    imap.insert(num, ba);
  }
}

void
inodeMap::replay(timestamp to)
{
  fileReader r(fp);

  while (true) {
    // Read length
    long length = r.readLong();

    if (feof(fp)) {
      // Hit EOF; done replaying
      ts = TIMESTAMP_LATEST;
      return;
    }
    
    // Read entry type
    entryTypes type = (entryTypes)r.readByte();
    // Read timestamp
    timestamp curTS;
    r.read(&curTS, sizeof(timestamp));

    if (r.hasError())
      fatal << "Failed to read inode log snapshot entry header\n";

    // How's my timestamp?
    if (curTS > to)
      return;

    if (type == ENTRY_MODIFY) {
      // Modify (or insert) this inode into the map
      inumber num;
      blockAddress ba;
      r.read(&num, sizeof(inumber));
      r.read(&ba, sizeof(blockAddress));
      imap.insert(num, ba);
      latest = curTS;
      if (num >= nextInumber)
        nextInumber = num + 1;
      ++modifications;
    } else if (type == ENTRY_DELETE) {
      // Delete the inode from the map
      inumber num;
      r.read(&num, sizeof(inumber));
      imap.remove(num);
      latest = curTS;
      ++modifications;
    } else if (type == ENTRY_SNAPSHOT) {
      // Update latest timestamp
      latest = curTS;
      // Reset modifications count
      modifications = 0;
      // But skip the snapshot
      int offset = length - (sizeof(long) + sizeof(char) + sizeof(long));
      fseek(fp, offset, SEEK_CUR);
    } else {
      fatal << "Unknown entry type " << type << " in inode log @ "
            << ftell(fp)-sizeof(timestamp) << "\n";
    }
    ts = curTS;

    // Read the snapshot offset in the entry footer
    lastSnapshot = r.readLong();
  }
}

unsigned long
inodeMap::writeSnapshot(timestamp its)
{
  verifyWritable(its);

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);
  unsigned long offset = ftell(fp);

  // Put length
  w.writeLong(sizeof(long)+sizeof(char)+sizeof(timestamp)+
              sizeof(long)+sizeof(inumber)+
              (sizeof(inumber)+sizeof(blockAddress))*imap.size()+
              sizeof(long));
  // Put entry type
  w.writeByte(ENTRY_SNAPSHOT);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put previous snapshot offset
  w.writeLong(lastSnapshot);
  // Put next inode number
  w.write(&nextInumber, sizeof(inumber));
  // Put inumber, blockAddress pairs
  imap.traverse(wrap(this, &inodeMap::writeSnapshotEntry));
  // Put last snapshot offset (this snapshot)
  w.writeLong(offset);

  latest = its;
  lastSnapshot = offset;
  modifications = 0;
  return offset;
}

void
inodeMap::writeSnapshotEntry(const inumber &num, blockAddress *ba)
{
  fileWriter w(fp);
  
  w.write(&num, sizeof(inumber));
  w.write(ba, sizeof(blockAddress));
}

void
inodeMap::modify(timestamp its, inumber num, blockAddress ba)
{
  verifyWritable(its);

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);

  // Put length
  w.writeLong(sizeof(long)+sizeof(char)+sizeof(timestamp)+
              sizeof(inumber)+sizeof(blockAddress)+
              sizeof(long));
  // Put entry type
  w.writeByte(ENTRY_MODIFY);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put inumber
  w.write(&num, sizeof(inumber));
  // Put block address
  w.write(&ba, sizeof(blockAddress));
  // Put last snapshot offset
  w.writeLong(lastSnapshot);
  // Done
  fflush(fp);

  // And record the inode in memory
  imap.insert(num, ba);
  latest = its;
  if (num >= nextInumber)
    nextInumber = num + 1;
  ++modifications;
}

void
inodeMap::remove(timestamp its, inumber num)
{
  verifyWritable(its);
  if (!imap[num])
    fatal << "Attempt to remove non-existent inode from map\n";

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);

  // Put length
  w.writeLong(sizeof(long)+sizeof(char)+sizeof(timestamp)+
              sizeof(inumber)+
              sizeof(long));
  // Put entry type
  w.writeByte(ENTRY_DELETE);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put inumber
  w.write(&num, sizeof(inumber));
  // Put last snapshot offset
  w.writeLong(lastSnapshot);
  // Fone
  fflush(fp);

  // And record the delete in memory
  imap.remove(num);
  latest = its;
  ++modifications;
}

blockAddress
inodeMap::get(inumber num)
{
  blockAddress *ba = imap[num];
  if (ba)
    return *ba;
  else
    return (blockAddress)(-1);
}

inumber
inodeMap::newInumber()
{
  verifyWritable(TIMESTAMP_LATEST);
  return nextInumber++;
}

int
inodeMap::getModificationCount()
{
  return modifications;
}

void
inodeMap::verifyWritable(timestamp its)
{
  if (ts != TIMESTAMP_LATEST) {
    fatal << "Attempt to put an inode mapping into a non-current inodeMap\n";
  }
  if (its < latest) {
    fatal << "Attempt to go backwards in time\n";
  }
}

//
// snapshotMap
//

snapshotMap::snapshotMap(FILE *fp)
  : fp(fp)
{
}

void
snapshotMap::clear()
{
  snapshots.clear();
}

void
snapshotMap::load(unsigned long lastSnapshotOffset)
{
  fileReader r(fp);
  // Keep a temporary copy that's in reverse chronological order.
  // We'll reverse this when we're done loading.
  vec<timeOffsetPair> pairs;
  unsigned long currentOffset = lastSnapshotOffset;
  timestamp prevTS = 0;

  while (true) {
    // Skip to entry type field of snapshot
    fseek(fp, currentOffset+sizeof(long), SEEK_SET);

    // Read entry type
    entryTypes type = (entryTypes)r.readByte();
    if (type != ENTRY_SNAPSHOT) {
      fatal << "Found a non-snapshot entry at " << currentOffset
            << " while loading snapshot map\n";
    }

    // Read timestamp
    timestamp ts;
    r.read(&ts, sizeof(timestamp));
    if (ts > prevTS)
      fatal << "Snapshot at " << currentOffset
            << " younger than next snapshot\n";

    // Read previous snapshot offset
    unsigned long prevOffset = r.readLong();

    // Record this snapshot
    timeOffsetPair pair;
    pair.ts = ts;
    pair.offset = currentOffset;
    pairs.push_back(pair);

    // On to the next snapshot
    if (prevOffset == currentOffset) {
      break;
    }
    currentOffset = prevOffset;
  }

  // Reverse the pairs so it's in chronological order
  snapshots.reserve(pairs.size());
  for (timeOffsetPair *p = pairs.lim()-1; p >= pairs.base(); --p) {
    snapshots.push_back(*p);
  }
}

void
snapshotMap::add(timestamp ts, unsigned long offset)
{
  if (ts < snapshots.back().ts)
    fatal << "Attempt to create snapshot in the past\n";

  timeOffsetPair pair;
  pair.ts = ts;
  pair.offset = offset;
  snapshots.push_back(pair);
}

unsigned long
snapshotMap::findSnapshot(timestamp ts)
{
  // XXX Make this use binary search

  // Special case for the most recent snapshot for efficiency
  if (snapshots.back().ts <= ts)
    return snapshots.back().offset;
  
  timeOffsetPair *p, *prev;
  for (p = snapshots.base(), prev = p;
       p < snapshots.lim() && p->ts <= ts;
       prev = p++);

  return prev->offset;
}

//
// inode
//

void
inode::unmarshall(marshalled m,
                  callback<void, inode>::ref cb,
                  cbe error)
{
  strReader r(m);
  inode node;

  r.read(&node.num, sizeof(inumber));
  r.read((fattr3*)&node, sizeof(fattr3));
  node.content = r.readStr();

  cb(node);
}

void
inode::marshall(callback<void, marshalled>::ref cb, cbe error)
{
  strWriter w;

  w.write(&num, sizeof(inumber));
  w.write((fattr3*)this, sizeof(fattr3));
  w.writeStr(content);

  cb(w.getStr());
}

