#include "chunkable.h"
#include "happyio.h"

//#define DEBUG_FLUSH
//#define DEBUG_WRITE

#define WRAP(args...) wrap(this, &chunkable::args)

#define FINGERPRINT_PT  0xbfe6b8a5bf378d83LL
#define CHUNK_SIZE      8096
#define CHUNK_BREAK_VALUE 0x78
#define MIN_CHUNK_SIZE  2048
#define MAX_CHUNK_SIZE  65535

#ifdef DEBUG_FLUSH
#define trFlush (warnobj(0)("flush: "))
#endif
#ifdef DEBUG_WRITE
#define trWrite (warnobj(0)("write: "))
#endif

void
chunkable::create(ref<superblob> blob, str content,
                  callback<void, ref<chunkable> >::ref cb,
                  cbe error)
{
  chunkList chunks;
  ref<chunkable> c = New refcounted<chunkable>(blob, chunks);

  c->writeSubstr(0, content,
                 wrap(&chunkable::create_after_write, c, cb, error), error);
}

void
chunkable::create_after_write(ref<chunkable> c,
                              callback<void, ref<chunkable> >::ref cb,
                              cbe error)
{
  cb(c);
}

chunkable::chunkable(ref<superblob> blob, chunkList cl)
  : blob(blob)
{
  // Grr, tailq's don't provide a swap method.  This assumes chunks is
  // empty (which it must be at this point)
  if ( (chunks.first = cl.first) ) {
    chunks.first->link.pprev = &chunks.first;
    chunks.plast = cl.plast;
  }

  cl.first = NULL;
  cl.plast = &cl.first;
}

chunkable::~chunkable()
{
  chunk *c;
  while (chunks.first)
    delete chunks.remove(chunks.first);
}

chunkable::chunkable(const chunkable & c)
  : blob(c.blob)
{
  chunk *p = c.chunks.first;
  while (p != NULL) {
    chunk *n = New chunk(*p);
    chunks.insert_tail(n);
    p = c.chunks.next(p);
  }
}


void
chunkable::unmarshall(ref<superblob> blob, marshalled m,
                      callback<void, ref<chunkable> >::ref cb,
                      cbe error)
{
  chunkList chunks;

  // Unmarshall the chunk vector
  strReader r(m);

  while (true) {
    chunk *ch = New chunk;
    r.read(&ch->address, sizeof(blockAddress));
    ch->length = r.readLong();
    ch->state = CHUNK_NOT_LOADED;

    if (r.eos())
      break;
    chunks.insert_tail(ch);
  }
  
  ref<chunkable> c = New refcounted<chunkable>(blob, chunks);

  cb(c);
}

void
chunkable::readSubstr(unsigned long start, unsigned long len,
                      callback<void, str>::ref cb,
                      cbe error)
{
  length(wrap(this, &chunkable::readSubstr_after_getLength,
              start, len, cb, error),
         error);
}

void
chunkable::readSubstr_after_getLength(unsigned long start, unsigned long len,
                                      callback<void, str>::ref cb,
                                      cbe error,
                                      unsigned long totalLength)
{
  if (start >= totalLength) {
    // Attempt to read starting past end of chunkable
    cb(str(""));
    return;
  }

  // Find the chunks at the endpoints
  unsigned long firstPos = start;
  chunk *first = findChunk(&firstPos);
  unsigned long lastPos = MIN(totalLength, start + len) - 1;
  chunk *last = findChunk(&lastPos);

  if (!first || !last) {
    // Hit end of list before finding start or end
    fatal << "BUG: Unexpected end of chunk list during read seek\n";
  }

  // Kludge: To avoid the wrap limit, error isn't passed
  loadChunks(first, last, wrap(this, &chunkable::readSubstr_after_load,
                               first, firstPos, last, lastPos+1, cb),
             error);
}

void
chunkable::readSubstr_after_load(chunk *first, unsigned long firstPos, 
                                 chunk *last, unsigned long lastPos,
                                 callback<void, str>::ref cb)
{
  str content = gatherChunks(first, firstPos, last, lastPos);

  cb(content);
}

void
chunkable::writeSubstr(unsigned long start, str s,
                       callback<void>::ref cb,
                       cbe error)
{
  // Get the length
  length(wrap(this, &chunkable::writeSubstr_after_getLength,
              start, s, cb, error),
         error);
}

void
chunkable::writeSubstr_after_getLength(unsigned long start, str s,
                                       callback<void>::ref cb,
                                       cbe error,
                                       unsigned long totalLength)
{
# ifdef DEBUG_WRITE
  trWrite << "Starting write\n";
  debug_dumpChunks();
# endif

  if (start == totalLength) {
    // Simply append a new chunk

    // Don't insert zero-length chunks
    if (s.len() == 0) {
      cb();
      return;
    }

    chunk *c = New chunk;
    c->alterContents(s);
    chunks.insert_tail(c);
    cb();
  } else if (start + s.len() >= totalLength) {
    // This covers two cases. If start is past the end of the string,
    // pad chunkable up with NULLs and try again. If start is before
    // the end of the string but overwrites the entire remainder of
    // it, truncate down and reduce to appending.
    truncate(start, wrap(this, &chunkable::writeSubstr, start, s, cb, error),
             error);
  } else {
    // Now life gets hard.  We need to splice out a range and replace it

    // Don't insert zero-length chunks
    if (s.len() == 0) {
      cb();
      return;
    }
    
    // Create new chunk
    chunk *c = New chunk;
    c->alterContents(s);

    // Find the chunks at the endpoints so they can be split
    unsigned long c1Pos = start, c2Pos = start + s.len();
    chunk *c1 = findChunk(&c1Pos);

    if (!c1)
      fatal << "BUG: Failed to seek to start chunk during write\n";

    // Split c1
    loadAndSplitChunk(c1, c1Pos,
                      WRAP(writeSubstr_after_split1, c, c2Pos, cb, error),
                      error);
  }
}

void
chunkable::writeSubstr_after_split1(chunk *c, unsigned long c2Pos,
                                    callback<void>::ref cb, cbe error,
                                    chunk *pre1, chunk *post1)
{
# ifdef DEBUG_WRITE
  trWrite << "Did first split\n";
  debug_dumpChunks();
# endif

  // Find the chunk that the end is in.  This has to be done after the
  // split of c1 because the boundaries will change if both c1Pos and
  // c2Pos are in the same chunk.
  chunk *c2 = findChunk(&c2Pos);
  if (!c2)
    fatal << "BUG: Failed to seek to end chunk during write\n";

  // Insert the new chunk after pre1
  if (pre1)
    insertChunkAfter(pre1, c);
  else
    chunks.insert_head(c);

# ifdef DEBUG_WRITE
  trWrite << "Inserted new chunk\n";
  debug_dumpChunks();
# endif

  // Split c2
  loadAndSplitChunk(c2, c2Pos,
                    WRAP(writeSubstr_after_split2, c, post1, cb, error),
                    error);
}

void
chunkable::writeSubstr_after_split2(chunk *c, chunk *post1,
                                    callback<void>::ref cb, cbe error,
                                    chunk *pre2, chunk *post2)
{
# ifdef DEBUG_WRITE
  trWrite << "Did second split\n";
  debug_dumpChunks();
# endif

  // Nuke the overwritten chunks between the split boundaries
  nukeChunks(post1, pre2);

# ifdef DEBUG_WRITE
  trWrite << "Nuked overwritten chunks\n";
  debug_dumpChunks();
  trWrite << "Write done\n";
# endif

  cb();
}

void
chunkable::truncate(unsigned long len,
                    callback<void>::ref cb,
                    cbe error)
{
  // Get the real length
  length(wrap(this, &chunkable::truncate_after_getLength, len, cb, error),
         error);
}

void
chunkable::truncate_after_getLength(unsigned long len,
                                    callback<void>::ref cb,
                                    cbe error,
                                    unsigned long realLength)
{
  if (len == realLength) {
    // Nothing needs to be done
    cb();
  } else if (len < realLength) {
    // Truncate down to len
    unsigned long pos = len;
    chunk *c = findChunk(&pos);

    if (!c)
      fatal << "BUG: Unexpected end of chunk list in truncate seek\n";

    loadAndSplitChunk(c, pos, WRAP(truncate_after_split, cb, error), error);
  } else if (len > realLength) {
    // Append a new chunk with padding NULLs
    chunk *c = New chunk;
    strbuf buf;

    padWithZeros(buf, len - realLength);

    c->alterContents(str(buf));

    chunks.insert_tail(c);

    cb();
  }
}

void
chunkable::truncate_after_split(callback<void>::ref cb,
                                cbe error,
                                chunk *pre, chunk *post)
{
  // Remove [post..]
  while (post) {
    chunk *next = post->link.next;
    delete chunks.remove(post);
    post = next;
  }

  cb();
}

void
chunkable::flush(callback<void>::ref cb, cbe error)
{
  chunk *c = chunks.first;

# ifdef DEBUG_FLUSH
  trFlush << "Begin first stage with\n";
  debug_dumpChunks();
# endif

  // First phase.  Coalesce runs of dirty chunks to make life easier
  // later.  This process is syncronous.
  for (c = chunks.first; c; c = c->link.next) {
    // Are there at least two dirty chunks in a row?
    if (c->state == CHUNK_DIRTY &&
        c->link.next && c->link.next->state == CHUNK_DIRTY) {
      // Accumulate the dirty run
      strbuf buf(c->content);
      while (c->link.next && c->link.next->state == CHUNK_DIRTY) {
        chunk *c2 = chunks.remove(c->link.next);
        buf << c2->content;
        delete c2;
      }

      // Replace chunk with new accumulation
      c->alterContents(str(buf));
    }
  }

# ifdef DEBUG_FLUSH
  trFlush << "After first stage\n";
  debug_dumpChunks();
# endif

  // Second phase.  Pre-load chunks that cover the window::size bytes
  // before dirty chunks so the window will always be valid by the
  // time it reaches a dirty chunk.  This process is asyncronous, so
  // this is a partially continuation converted loop.
  c = chunks.first;
  flush_preload(c, 0, cb, error);
}

void
chunkable::flush_preload(chunk *c, unsigned long pos,
                         callback<void>::ref cb, cbe error)
{
  while (c) {
    if (c->state == CHUNK_DIRTY && pos > 0) {
      // Found a dirty chunk.  Preload previous chunks to get previous
      // window::size bytes.
      chunk *start = findChunk(pos - MIN(pos, window::size));
      // Yes, this is a nested continuation converted loop.  Isn't SFS
      // great?
      flush_preload_loadRange(start, c,
                              wrap(this, &chunkable::flush_preload,
                                   c->link.next, pos+c->length, cb, error),
                              error);
      return;
    }
    pos += c->length;
    c = c->link.next;
  }

# ifdef DEBUG_FLUSH
  trFlush << "After second stage\n";
  debug_dumpChunks();
# endif

  // Third phase.  Do the electric slide.
  c = chunks.first;
  window w(FINGERPRINT_PT);
  w.reset();
  wrapSucksFlushArgs wsfa = {w, c, 0, 0, NULL, 0};
  flush_slide(wsfa, cb, error);
}

void
chunkable::flush_preload_loadRange(chunk *start, chunk *end,
                                   callback<void>::ref cb, cbe error)
{
  // For efficiency, go ahead and skip loaded chunks
  while (start->state != CHUNK_NOT_LOADED && start != end)
    start = start->link.next;

  if (start == end)
    // Done
    cb();
  else
    // Found a chunk that needs to be loaded
    loadChunk(start, wrap(this, &chunkable::flush_preload_loadRange,
                          start->link.next, end, cb, error),
              error);
}

void
chunkable::flush_slide(wrapSucksFlushArgs s,
                       callback<void>::ref cb, cbe error)
{
  while (s.c) {
    if (s.c->state == CHUNK_NOT_LOADED) {
      // Am I still accumulating a chunk?
      if (s.rechunkStart) {
        // Load this chunk so I can continue chunking
        loadChunk(s.c, wrap(this, &chunkable::flush_slide, s, cb, error),
                  error);
        return;
      } else {
        // Nope, just skip this chunk and reset the fingerprint since
        // it's now going to be wrong until it's accumulated another
        // window's wort of real data.
        s.c = s.c->link.next;
        s.w.reset();
        s.fingerprintLen = 0;
      }
    } else if (s.c->state == CHUNK_LOADED || s.c->state == CHUNK_DIRTY) {
      // Slide fingerprint window
      u_int64_t fingerprint;
      fingerprint = s.w.slide8(s.c->content[s.chunkPos]);
      ++s.fingerprintLen;
      if (!s.rechunkStart) {
        // This must be the beginning of a new rechunk
        s.rechunkStart = s.c;
        s.rechunkStartPos = s.chunkPos;
        assert(s.rechunkStartPos == 0);
      }
      ++s.chunkPos;
      
      // Do I need to draw a chunk boundary?
      // XXX This currently doesn't honor the chunk size limits
      if ((s.chunkPos == s.c->length && !s.c->link.next && s.rechunkStart) ||
          (s.fingerprintLen >= window::size &&
           fingerprint % CHUNK_SIZE == CHUNK_BREAK_VALUE)) {
        // Do I need to accumulate up multiple past chunks into the
        // rechunk?  If not, then a split will be sufficient to place
        // the new boundary.
        chunk *rechunk = NULL;
        if (s.rechunkStart != s.c) {
          // Gather the contents of the rechunk
          str rechunkContent = gatherChunks(s.rechunkStart, s.rechunkStartPos,
                                            s.c, s.chunkPos);
          // Create a new chunk with the rechunked contents
          rechunk = New chunk;
          rechunk->alterContents(rechunkContent);
        }

        // Split the current chunk into two
        chunk *pre, *post;
        splitChunk(s.c, s.chunkPos, &pre, &post);

        if (rechunk) {
          // Nuke chunks that have been accumulated into the new chunk
          nukeChunks(s.rechunkStart, pre);
          // Replace the nuked chunks with the rechunk
          if (post)
            insertChunkBefore(post, rechunk);
          else
            chunks.insert_tail(rechunk);
          // My old position is invalid.  Place the new one just past
          // the end of the rechunk (which is equivalent to where it
          // was before in terms of overall offset)
          s.c = rechunk;
          s.chunkPos = rechunk->length;
        }
        // Reset chunking state
        s.rechunkStart = NULL;
        s.rechunkStartPos = 0;
      }

      // Move to the next chunk if necessary
      if (s.chunkPos == s.c->length) {
        s.chunkPos = 0;
        s.c = s.c->link.next;
      }
    } else {
      fatal << "BUG: Bad chunk state encountered during flush\n";
    }
  }

# ifdef DEBUG_FLUSH
  trFlush << "After third stage\n";
  debug_dumpChunks();
# endif

  // Fourth phase!  Write all of the dirty chunks to the blob
  flush_flush(chunks.first, cb, error);
}

void
chunkable::flush_flush(chunk *c, callback<void>::ref cb, cbe error)
{
  while (c && c->state != CHUNK_DIRTY)
    c = c->link.next;

  if (c) {
    blob->put(c->content, wrap(this, &chunkable::flush_flush_after_put,
                               c, cb, error),
              error);
  } else {
# ifdef DEBUG_FLUSH
    trFlush << "After fourth stage\n";
    debug_dumpChunks();
    trFlush << "Flush done\n";
# endif

    cb();
  }
}

void
chunkable::flush_flush_after_put(chunk *c, callback<void>::ref cb, cbe error,
                                 blockFingerprint bf, blockAddress ba)
{
  c->state = CHUNK_LOADED;
  c->address = ba;
  flush_flush(c->link.next, cb, error);
}

void
chunkable::length(callback<void, unsigned long>::ref cb, cbe error)
{
  int len;
  chunk *c;

  for (len = 0, c = chunks.first;
       c;
       len += c->length, c = c->link.next);

  cb(len);
}

void
chunkable::marshall(callback<void, marshalled>::ref cb,
                    cbe error)
{
  strWriter w;
  chunk *c;

  for (c = chunks.first; c; c = c->link.next) {
    if (c->state == CHUNK_DIRTY)
      fatal << "BUG: Attempt to marshall a dirty chunkable\n";

    w.write(&c->address, sizeof(blockAddress));
    w.writeLong(c->length);
  }

  if (w.hasError())
    error(PERR_INTERNAL);
  else
    cb(w.getStr());
}

void
chunkable::debug_dumpChunks()
{
  chunk *c;
  unsigned long len = 0;
  
  for (c = chunks.first; c; c = c->link.next) {
    if (c->state == chunkable::CHUNK_NOT_LOADED)
      warn << "(-) ";
    else if (c->state == chunkable::CHUNK_LOADED)
      warn << "( ) ";
    else if (c->state == chunkable::CHUNK_DIRTY)
      warn << "(D) ";
    else
      warn << "(?) ";

    warn << c->length;

    if (c->state == chunkable::CHUNK_NOT_LOADED ||
        c->state == chunkable::CHUNK_LOADED)
      warn << ":" << c->address;

    if (c->state != chunkable::CHUNK_NOT_LOADED)
      warn << " \t" << debug_unparseStr(substr(c->content.cstr(), 0, 20));

    warn << "\n";

    len += c->length;
  }
  warn << "Total: " << len << "\n";
}

void
chunkable::loadChunk(chunk *c,
                     callback<void>::ref cb, cbe error)
{
  if (c->state == CHUNK_NOT_LOADED) {
    blob->getDirect(c->address, wrap(this, &chunkable::loadChunk_after_get,
                                     c, cb, error),
                    error);
  } else {
    // Chunk already loaded
    cb();
  }
}

void
chunkable::loadChunk_after_get(chunk *c,
                               callback<void>::ref cb, cbe error,
                               str content, blockAddress ba)
{
  if (ba != c->address)
    fatal << "BUG: Chunk address does not match loaded address\n";
  if (content.len() != c->length)
    fatal << "BUG: Size mismatch betwen in-memory chunk and loaded chunk\n";
  
  c->state = CHUNK_LOADED;
  c->content = content;
  cb();
}

void
chunkable::loadChunks(chunk *first, chunk *last,
                      callback<void>::ref cb, cbe error)
{
  if (first != last)
    loadChunk(first, wrap(this, &chunkable::loadChunks,
                          first->link.next, last, cb, error),
              error);
  else
    loadChunk(first, cb, error);
}

chunkable::chunk*
chunkable::findChunk(unsigned long *pos)
{
  chunk *c = chunks.first;

  while (true) {
    if (!c) {
      // Hit end of list before finding pos
      return NULL;
    }

    if (c->length > *pos) {
      // We found the chunk that pos is in
      return c;
    } else {
      // We're not to pos yet
      *pos -= c->length;
      c = c->link.next;
    }
  }
}

bool
chunkable::splitChunk(chunk *c, unsigned long firstLen,
                      chunk **pre, chunk **post)
{
  if (firstLen > c->length)
    fatal << "BUG: Attempt to split a chunk beyond its length\n";

  if (firstLen == 0) {
    // No splitting necessary.  Find the stupid previous element.
    if (pre) {
      *pre = chunks.first;
      if (*pre == c)
        *pre = NULL;
      else
        while ((*pre)->link.next != c)
          *pre = (*pre)->link.next;
    }
    if (post)
      *post = c;

    return false;
  }

  if (firstLen == c->length) {
    // No splitting necessary
    if (pre)
      *pre = c;
    if (post)
      *post = c->link.next;
    return false;
  }

  // Splitting necessary.  Make sure I have the chunk available
  if (c->state == CHUNK_NOT_LOADED)
    fatal << "BUG: Attempt to split non-loaded chunk\n";

  // Do the split
  chunk *c2 = New chunk;
  c2->alterContents(substr(c->content, firstLen));
  c->alterContents(substr(c->content, 0, firstLen));
  insertChunkAfter(c, c2);

  if (pre)
    *pre = c;
  if (post)
    *post = c2;
  return true;
}

void
chunkable::loadAndSplitChunk(chunk *c, unsigned long firstLen,
                             callback<void, chunk*, chunk*>::ref cb,
                             cbe error)
{
  if (firstLen > c->length)
    fatal << "BUG: Attempt to load and split a chunk beyond its length\n";

  // Do I need to load this chunk?
  if (firstLen > 0 && firstLen < c->length
      && c->state == CHUNK_NOT_LOADED) {
      loadChunk(c, WRAP(loadAndSplitChunk, c, firstLen, cb, error), error);
  } else {
    // No loading necessary
    chunk *pre, *post;
    splitChunk(c, firstLen, &pre, &post);
    cb(pre, post);
  }
}

void
chunkable::nukeChunks(chunk *first, chunk *last)
{
  if (!first || !last)
    fatal << "BUG: Attempt to nuke chunks in NULL range\n";

  // Nuke [first, last)...
  while (first != last) {
    chunk *next = first->link.next;
    delete chunks.remove(first);
    first = next;
  }
  // ... and last itself
  delete chunks.remove(last);
}

void
chunkable::insertChunkBefore(chunk *pos, chunk *c)
{
  if (!pos)
    fatal << "BUG: Attempt to insert chunk before NULL chunk\n";
  if (!c)
    fatal << "BUG: Attempt to insert NULL chunk before chunk\n";

  if (pos == chunks.first)
    chunks.insert_head(c);
  else {
    // WHAT THE HELL?  Very carefully and MANUALLY link c in before
    // pos because SFS doesn't friggin' provide an insert for it's
    // stupid linked list implementation.
    c->link.pprev = pos->link.pprev;
    c->link.next = pos;
    *(pos->link.pprev) = c;
    pos->link.pprev = &c->link.next;
  }
}

void
chunkable::insertChunkAfter(chunk *pos, chunk *c)
{
  if (!pos)
    fatal << "BUG: Attempt to insert chunk after NULL chunk\n";
  if (!c)
    fatal << "BUG: Attempt to insert NULL chunk after chunk\n";

  if (pos->link.next)
    insertChunkBefore(pos->link.next, c);
  else
    chunks.insert_tail(c);
}

str
chunkable::gatherChunks(chunk *first, unsigned long firstPos,
                        chunk *last, unsigned long lastPos)
{
  strbuf buf;

  // Gather up everything except that last chunk
  while (first != last) {
    if (!first)
      fatal << "BUG: Hit end of chunks before gathering up to last\n";
    if (first->state == CHUNK_NOT_LOADED)
      fatal << "BUG: Attempt to gather non-loaded chunks\n";

    if (firstPos)
      buf << substr(first->content, firstPos);
    else
      buf << first->content;

    first = first->link.next;
    firstPos = 0;
  }

  // Gather the contents of the last chunk
  if (last->state == CHUNK_NOT_LOADED)
      fatal << "BUG: Attempt to gather non-loaded chunks\n";

  buf << substr(last->content, firstPos, lastPos - firstPos);

  return str(buf);
}

void
chunkable::padWithZeros(strbuf buf, unsigned long padLen)
{
  char czeros[128];

  bzero(czeros, sizeof(czeros));
  str zeros(czeros, sizeof(czeros));

  while (padLen) {
    if (padLen >= sizeof(czeros))
      buf << zeros;
    else
      buf << substr(zeros, 0, padLen);
    padLen -= MIN(sizeof(czeros), padLen);
  }
}
