#include "logginginodelog.h"

static const bool DEBUG_SNAPSHOT = false;

#define WRAP(args...) wrap(this, &loggingInodeLog::args)

// Format of inode log file
//  magic (MAGIC_NUMBER)   4 bytes
//  entry*
//
// All entries
//  length                 4 bytes   (includes whole header)
//  entry type             1 byte
//  timestamp              sizeof(timestamp)
//  payload                variable
//  last snapshot offset   4 bytes   (this snapshot if this is a snapshot)
//
// Inode modify payload (entry type 1)
//  inumber                sizeof(inumber)
//  blockAddress           sizeof(blockAddress)
//
// Inode delete payload (entry type 2)
//  inumber                sizeof(inumber)
//
// Inode map snapshot payload (entry type 3)
//  prev. snapshot offset  4 bytes   (0 if this is the first)
//  next free inumber      sizeof(inumber)
//  root inumber           sizeof(inumber)
//  (inumber, blockAddress)* (sizeof(inumber)+sizeof(blockAddress))*n

static const unsigned long MAGIC_NUMBER = 0x474f4c49;

enum entryTypes
{
  ENTRY_MODIFY = 1,
  ENTRY_DELETE = 2,
  ENTRY_SNAPSHOT = 3
};

static const unsigned int ENTRY_HEADER_LEN =
  sizeof(long)+sizeof(char)+sizeof(timestamp);
static const unsigned int ENTRY_FOOTER_LEN = sizeof(long);

static const unsigned int MODIFY_LEN = sizeof(inumber)+sizeof(blockAddress);
static const unsigned int DELETE_LEN = sizeof(inumber);

static const unsigned int SNAPSHOT_HEADER_LEN =
  ENTRY_HEADER_LEN+sizeof(long)+sizeof(inumber)+sizeof(inumber);
static const unsigned int SNAPSHOT_ENTRY_LEN =
  sizeof(inumber)+sizeof(blockAddress);
static const unsigned int SNAPSHOT_FOOTER_LEN = ENTRY_FOOTER_LEN;

//
// inodeLog
//

void
loggingInodeLog::create(str filename, ref<superblob> blob,
                        callback<void, ref<inodeLog> >::ref cb, cbe error)
{
  char *mode;
  FILE *fp;
  bool initialize;

  if (access(filename, F_OK) == 0) {
    // File exists, so load it
    initialize = false;
    mode = "r+b";
  } else {
    // File doesn't exist, so create it
    initialize = true;
    mode = "w+b";
  }

  // Open the file
  fp = fopen(filename.cstr(), mode);
  if (!fp) {
    error(PERR_IO);
    return;
  }

  // Do I need to initialize the contents?
  if (initialize) {
    fileWriter w(fp);
    // Put a header on the file
    w.writeLong(MAGIC_NUMBER);
    // Insert an empty inode map snapshot at the beginning of time
    unsigned long offset = ftell(fp);
    // Write length
    w.writeLong(SNAPSHOT_HEADER_LEN +
                0*SNAPSHOT_ENTRY_LEN +
                SNAPSHOT_FOOTER_LEN);
    // Write type
    w.writeByte(ENTRY_SNAPSHOT);
    // Write timestamp
    timestamp ts = (timestamp)0;
    w.write(&ts, sizeof(timestamp));
    // Write previous snapshot offset
    w.writeLong(0);
    // Write next free inumber
    inumber num = (inumber)1;
    w.write(&num, sizeof(inumber));
    // Write root inumber (currently unknown)
    num = (inumber)0;
    w.write(&num, sizeof(inumber));
    // Write last snapshot offset (this is a snapshot, so it's this one)
    w.writeLong(offset);
    if (w.hasError() || fflush(fp) || fseek(fp, 0, SEEK_SET)) {
      error(PERR_IO);
      return;
    }
  }

  cb(New refcounted<loggingInodeLog>(fp, blob));
}

loggingInodeLog::loggingInodeLog(FILE *fp, ref<superblob> blob)
  : fp(fp), blob(blob), currentMap(this), snapshots(this),
    readStartPos(0), bytesRead(0)
{
  entryCount = logSize = 0;
  
  // Confirm header
  fseek(fp, 0, SEEK_SET);
  fileReader r(fp);
  if (r.readLong() != MAGIC_NUMBER)
    fatal << "Incorrect magic number in inode log file\n";

  // Find location of the most recent snapshot
  fseek(fp, -ENTRY_FOOTER_LEN, SEEK_END);
  long lastSnapshot = r.readLong();

  // Load the most recent snapshot and bring the world up to speed
  currentMap.replayFromSnapshot(lastSnapshot, TIMESTAMP_LATEST);

  // Find locations of all past snapshots
  snapshots.load(lastSnapshot);

  // Start the snapshot timer
  if (SNAPSHOT_PERIODICALLY)
    scheduleNextSnapshot();
}

loggingInodeLog::~loggingInodeLog()
{
  fclose(fp);
}

void
loggingInodeLog::put(inode node,
                     callback<void, timestamp>::ref cb, cbe error)
{
  // Serialize the inode
  str m = marshaller<inode>()(node);

  // Put the inode in the superblob
  blob->put(m, WRAP(put_afterBlobPut, node, cb, error), error);
}

void
loggingInodeLog::put_afterBlobPut(inode node,
                                  callback<void, timestamp>::ref cb, cbe error,
                                  blockFingerprint fingerprint,
                                  blockAddress address)
{
  timestamp ts = now();

  // Log the inode modification
  currentMap.modify(ts, node.num, address);

  fseek(fp, 0, SEEK_END);
  logSize = ftell(fp);
  entryCount++;

  if (!SNAPSHOT_PERIODICALLY)
    maybeTakeSnapshot();

  if (ferror(fp))
    error(PERR_IO);
  else
    cb(ts);
}

void
loggingInodeLog::del(inumber num,
                     callback<void, timestamp>::ref cb, cbe error)
{
  timestamp ts = now();

  // Log the inode deletion
  currentMap.remove(ts, num);

  if (!SNAPSHOT_PERIODICALLY)
    maybeTakeSnapshot();

  if (ferror(fp))
    error(PERR_IO);
  else
    cb(ts);
}

void
loggingInodeLog::get(timestamp ts, inumber num,
                     callback<void, inode>::ref cb, cbe error)
{
  blockAddress ba;

  if (ts == TIMESTAMP_LATEST) {
    // Find the address of the inode from the current inode map
    ba = currentMap.get(num);
  } else {
    // Find the best snapshot to start from
    unsigned long snapshotOffset = snapshots.findSnapshot(ts);
    inodeMap pastMap(this);
    pastMap.replayFromSnapshot(snapshotOffset, ts);
    ba = pastMap.get(num);
  }

  // Check if the inumber lookup failed
  if (ba == (blockAddress)(-1))
    error(PERR_NOENT);
  else
    // Load the inode from the blob
    blob->getDirect(ba, WRAP(get_afterBlobGet, cb, error), error);
}

void
loggingInodeLog::get_afterBlobGet(callback<void, inode>::ref cb, cbe error,
                                  str content, blockAddress ba)
{
  cb(unmarshaller<inode>()(content));
}

void
loggingInodeLog::newInumber(callback<void, inumber>::ref cb, cbe error)
{
  cb(currentMap.newInumber());
}

void
loggingInodeLog::setRootInumber(inumber num, callback<void>::ref cb, cbe error)
{
  if (currentMap.getRootInumber() != 0)
    fatal << "BUG: Attempt to set already set root inumber\n";
  currentMap.setRootInumber(num);
  // Need to take a snapshot in order to make this change durable
  takeSnapshot();
  cb();
}

void
loggingInodeLog::getRootInumber(callback<void, inumber>::ref cb, cbe error)
{
  inumber rootNum = currentMap.getRootInumber();
  cb(rootNum);
}

unsigned long
loggingInodeLog::blockTransfers()
{
  return bytesRead;
}

void
loggingInodeLog::resetBlockTransfers()
{
  bytesRead = 0;
}

void
loggingInodeLog::scheduleNextSnapshot()
{
  if (!SNAPSHOT_PERIODICALLY)
    fatal << "BUG: scheduleNextSnapshot called, but periodic snapshots"
          << " are disabled\n";
  delaycb(SNAPSHOT_TIME, WRAP(maybeTakeSnapshot, true));
}

void
loggingInodeLog::maybeTakeSnapshot(bool scheduled)
{
  int mods = currentMap.getModificationCount();

  if (!scheduled && SNAPSHOT_PERIODICALLY)
    // Ignore this snapshot request because it didn't come from the scheduler
    return;

  if (mods < SNAPSHOT_MIN_CHANGES) {
    if (DEBUG_SNAPSHOT && mods > 0)
      warn << "snapshot: Not taking (only " << mods << " modifications)\n";
  } else {
    takeSnapshot();
  }
  if (SNAPSHOT_PERIODICALLY)
    scheduleNextSnapshot();
}

void
loggingInodeLog::takeSnapshot()
{
  timestamp ts = now();
  if (DEBUG_SNAPSHOT)
    warn << "snapshot: Taking snapshot at time " << ts << "\n";
  unsigned long offset = currentMap.writeSnapshot(ts);
  if (DEBUG_SNAPSHOT)
    warn << "snapshot: Done.  Located at " << offset << "\n";
  snapshots.add(ts, offset);
}

void
loggingInodeLog::startRead()
{
  if (readStartPos != 0)
    fatal << "BUG: inode start read without end read\n";
  readStartPos = ftell(fp);
}

void
loggingInodeLog::endRead()
{
  unsigned long nowPos = ftell(fp);
  if (nowPos < readStartPos)
    fatal << "BUG: Read ended at offset < beginning offset\n";
  bytesRead += nowPos - readStartPos;
  readStartPos = 0;
}

//
// inodeMap
//

inodeMap::inodeMap(loggingInodeLog *log)
  : ts(0), log(log), fp(log->fp), latest(0),
    lastSnapshot(0), rootInumber(0), modifications(0)
{
}

void
inodeMap::clear() 
{
  imap.clear();
  modifications = 0;
}

void
inodeMap::replayFromSnapshot(unsigned long snapshotOffset, timestamp to)
{
  if (DEBUG_SNAPSHOT)
    warn << "inode map: Replaying from offset " << snapshotOffset
         << " to time " << to << "\n";

  clear();
  fseek(fp, snapshotOffset, SEEK_SET);
  readSnapshot();
  replay(to);
}

void
inodeMap::readSnapshot()
{
  fileReader r(fp);

  lastSnapshot = ftell(fp);

  log->startRead();

  // Read header
  long length = r.readLong();
  entryTypes type = (entryTypes)r.readByte();
  r.read(&ts, sizeof(timestamp));
  if (r.hasError())
    fatal << "Failed to read inode log snapshot entry header\n";
  latest = ts;

  // Make sure this is a snapshot
  if (type != ENTRY_SNAPSHOT)
    fatal << "Incorrect inode log entry type; expected snapshot, got "
          << type <<"\n";

  // Read (and discard) previous snapshot offset
  r.readLong();
  
  // Read next inumber
  r.read(&nextInumber, sizeof(inumber));

  // Read root inumber
  r.read(&rootInumber, sizeof(inumber));

  // How many inodes are in this snapshot?
  int ninodes = ((length - (SNAPSHOT_HEADER_LEN+SNAPSHOT_FOOTER_LEN)) /
                 SNAPSHOT_ENTRY_LEN);

  for (int i = 0; i < ninodes; ++i) {
    inumber num;
    blockAddress ba;

    r.read(&num, sizeof(inumber));
    r.read(&ba, sizeof(blockAddress));

    if (r.hasError() || r.eos())
      fatal << "Failed to read inode log snapshot entry\n";
    imap.insert(num, ba);
  }

  // Read last snapshot offset and make sure it's what it's supposed
  // to be
  unsigned int checkLastOffset = r.readLong();
  if (checkLastOffset != lastSnapshot)
    fatal << "Expected last snapshot offset " << lastSnapshot
          << " got " << checkLastOffset << "\n";

  log->endRead();
}

void
inodeMap::replay(timestamp to)
{
  fileReader r(fp);

  log->startRead();

  while (true) {
    // Read length
    long length = r.readLong();

    if (feof(fp)) {
      // Hit EOF; done replaying
      inodeLog::logSize = ftell(fp);
      ts = TIMESTAMP_LATEST;
      log->endRead();
      return;
    }
    
    // Read entry type
    entryTypes type = (entryTypes)r.readByte();
    // Read timestamp
    timestamp curTS;
    r.read(&curTS, sizeof(timestamp));

    if (r.hasError())
      fatal << "Failed to read inode log snapshot entry header\n";

    // How's my timestamp?
    if (curTS > to) {
      if (DEBUG_SNAPSHOT)
        warn << "inode map: Found ts " << curTS << ".  Done replaying\n";
      log->endRead();
      return;
    }

    if (DEBUG_SNAPSHOT)
      warn << "inode map: Replaying ts "
           << curTS << " of type " << type << " against ts " << to << "\n";

    if (type == ENTRY_MODIFY) {
      // Modify (or insert) this inode into the map
      inumber num;
      blockAddress ba;
      r.read(&num, sizeof(inumber));
      r.read(&ba, sizeof(blockAddress));
      imap.insert(num, ba);
      latest = curTS;
      if (num >= nextInumber)
        nextInumber = num + 1;
      ++modifications;
    } else if (type == ENTRY_DELETE) {
      // Delete the inode from the map
      inumber num;
      r.read(&num, sizeof(inumber));
      imap.remove(num);
      latest = curTS;
      ++modifications;
    } else if (type == ENTRY_SNAPSHOT) {
      // Update latest timestamp
      latest = curTS;
      // Reset modifications count
      modifications = 0;
      // But skip the snapshot
      int forward = length - ENTRY_HEADER_LEN;
      fseek(fp, forward, SEEK_CUR);
    } else {
      fatal << "Unknown entry type " << type << " in inode log @ "
            << ftell(fp)-sizeof(timestamp) << "\n";
    }
    ts = curTS;

    // Read the snapshot offset in the entry footer
    lastSnapshot = r.readLong();
  }
}

unsigned long
inodeMap::writeSnapshot(timestamp its)
{
  verifyWritable(its);

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);
  unsigned long offset = ftell(fp);

  log->startRead();

  // Put length
  w.writeLong(SNAPSHOT_HEADER_LEN +
              (SNAPSHOT_ENTRY_LEN)*imap.size() +
              SNAPSHOT_FOOTER_LEN);
  // Put entry type
  w.writeByte(ENTRY_SNAPSHOT);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put previous snapshot offset
  w.writeLong(lastSnapshot);
  // Put next inode number
  w.write(&nextInumber, sizeof(inumber));
  // Put root inumber
  w.write(&rootInumber, sizeof(inumber));
  // Put inumber, blockAddress pairs
  imap.traverse(wrap(this, &inodeMap::writeSnapshotEntry));
  // Put last snapshot offset (this snapshot)
  w.writeLong(offset);

  log->endRead();

  latest = its;
  lastSnapshot = offset;
  modifications = 0;
  return offset;
}

void
inodeMap::writeSnapshotEntry(const inumber &num, blockAddress *ba)
{
  fileWriter w(fp);
  
  w.write(&num, sizeof(inumber));
  w.write(ba, sizeof(blockAddress));
}

void
inodeMap::modify(timestamp its, inumber num, blockAddress ba)
{
  verifyWritable(its);

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);

  log->startRead();

  // Put length
  w.writeLong(ENTRY_HEADER_LEN + MODIFY_LEN + ENTRY_FOOTER_LEN);
  // Put entry type
  w.writeByte(ENTRY_MODIFY);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put inumber
  w.write(&num, sizeof(inumber));
  // Put block address
  w.write(&ba, sizeof(blockAddress));
  // Put last snapshot offset
  w.writeLong(lastSnapshot);
  // Done
  fflush(fp);

  log->endRead();

  // And record the inode in memory
  imap.insert(num, ba);
  latest = its;
  if (num >= nextInumber)
    nextInumber = num + 1;
  ++modifications;
}

void
inodeMap::remove(timestamp its, inumber num)
{
  verifyWritable(its);
  if (!imap[num])
    fatal << "Attempt to remove non-existent inode from map\n";

  fileWriter w(fp);
  fseek(fp, 0, SEEK_END);

  log->startRead();
  
  // Put length
  w.writeLong(ENTRY_HEADER_LEN + DELETE_LEN + ENTRY_FOOTER_LEN);
  // Put entry type
  w.writeByte(ENTRY_DELETE);
  // Put timestamp
  w.write(&its, sizeof(timestamp));
  // Put inumber
  w.write(&num, sizeof(inumber));
  // Put last snapshot offset
  w.writeLong(lastSnapshot);
  // Fone
  fflush(fp);

  log->endRead();
  
  // And record the delete in memory
  imap.remove(num);
  latest = its;
  ++modifications;
}

blockAddress
inodeMap::get(inumber num)
{
  blockAddress *ba = imap[num];
  if (ba)
    return *ba;
  else
    return (blockAddress)(-1);
}

inumber
inodeMap::newInumber()
{
  verifyWritable(TIMESTAMP_LATEST);
  return nextInumber++;
}

int
inodeMap::getModificationCount()
{
  return modifications;
}

unsigned long
inodeMap::getNumSnapshotEntries()
{
  return imap.size();
}

inumber
inodeMap::getRootInumber()
{
  return rootInumber;
}

void
inodeMap::setRootInumber(inumber num)
{
  rootInumber = num;
}

void
inodeMap::verifyWritable(timestamp its)
{
  if (ts != TIMESTAMP_LATEST) {
    fatal << "Attempt to put an inode mapping into a non-current inodeMap\n";
  }
  if (its < latest) {
    fatal << "Attempt to go backwards in time\n";
  }
}

//
// snapshotMap
//

snapshotMap::snapshotMap(loggingInodeLog *log)
  : log(log), fp(log->fp)
{
}

void
snapshotMap::clear()
{
  snapshots.clear();
}

void
snapshotMap::load(unsigned long lastSnapshotOffset)
{
  fileReader r(fp);
  // Keep a temporary copy that's in reverse chronological order.
  // We'll reverse this when we're done loading.
  vec<timeOffsetPair> pairs;
  unsigned long currentOffset = lastSnapshotOffset;
  timestamp prevTS = TIMESTAMP_LATEST;

  while (true) {
    // Skip to entry type field of snapshot
    fseek(fp, currentOffset, SEEK_SET);

    log->startRead();

    // Read (and discard) length field
    unsigned long length = r.readLong();

    // Read entry type
    entryTypes type = (entryTypes)r.readByte();
    if (type != ENTRY_SNAPSHOT) {
      fatal << "Found a non-snapshot entry at " << currentOffset
            << " while loading snapshot map\n";
    }

    // Read timestamp
    timestamp ts;
    r.read(&ts, sizeof(timestamp));
    if (ts > prevTS)
      fatal << "Snapshot at " << currentOffset
            << " (" << ts << ") younger than next snapshot ("
            << prevTS << ")\n";

    // Read previous snapshot offset
    unsigned long prevOffset = r.readLong();

    log->endRead();

    // Record this snapshot
    timeOffsetPair pair;
    pair.ts = ts;
    pair.offset = currentOffset;
    pairs.push_back(pair);

    // Can I get off this boat?
    if (prevOffset == 0) {
      break;
    }
    
    // On to the next snapshot
    currentOffset = prevOffset;
  }

  // Reverse the pairs so it's in chronological order
  snapshots.reserve(pairs.size());
  for (timeOffsetPair *p = pairs.lim()-1; p >= pairs.base(); --p) {
    snapshots.push_back(*p);
  }
}

void
snapshotMap::add(timestamp ts, unsigned long offset)
{
  if (ts < snapshots.back().ts)
    fatal << "Attempt to create snapshot in the past\n";

  timeOffsetPair pair;
  pair.ts = ts;
  pair.offset = offset;
  snapshots.push_back(pair);
}

unsigned long
snapshotMap::findSnapshot(timestamp ts)
{
  // XXX Make this use binary search

  // Special case for the most recent snapshot for efficiency
  if (snapshots.back().ts <= ts)
    return snapshots.back().offset;
  
  timeOffsetPair *p, *prev;
  for (p = snapshots.base(), prev = p;
       p < snapshots.lim() && p->ts <= ts;
       prev = p++);

  return prev->offset;
}
