#include "arpeggio.h"
#include  "chordfingerpns.h"
#include "observers/chordobserver.h"
#include "p2psim/network.h"
#include <stdio.h>
#include <iostream>
#include <algorithm>
#include <numeric>
#include <functional>
#include <ext/numeric>
#include <ext/algorithm>
#include <deque>
#include <math.h>
#include <ext/hash_set>
#include <ext/hash_map>

using namespace std;
using namespace __gnu_cxx;

extern bool static_sim;

#define DEBUGLEVEL 0
#define ARPDEBUG(x) if(DEBUGLEVEL >= (x)) (cerr << now() \
                                           << ": " << me.ip << " (" \
                                           << me.id << "): ")
#define NUMLEAVES 10
#define QUERYFILE "queries.txt"
#define RESULTSFILE "results.txt"
#define NUMRESULTS 1500000 
#define INSERT_INIT_DELAY 300000
#define INSERT_DELAY 1000
#define MAX_KEYWORDS 12

set<ConsistentHash::CHID> Arpeggio::allArpeggioIDs;
hash_map<ConsistentHash::CHID, Chord::IDMap> Arpeggio::allArpeggioIDsToIDMap;
FILE * Arpeggio::queryFile = NULL;
FILE * Arpeggio::resultsFile = NULL;

void
Arpeggio::printTime(void *a)
{
  cerr << now() << endl;
  delaycb(1000, &Arpeggio::printTime, (void *) 0);
}

Arpeggio::Arpeggio(IPAddress ip, Args& a, LocTable *l, const char *name)
  : ChordFingerPNS(ip, a, l, name)
{
  syncRunning = false;
  syncOutstanding = 0;
  numReplicas = a.nget<uint>("numreplicas", 2, 10);
  syncTimer = a.nget<uint>("synctimer", 300000, 10);
  kssK = a.nget<uint>("kssk", 3, 10);

  epoch = 0;

  //  ARPDEBUG(3) << "Created arpeggio" << endl;

  leafOfflineTime = 300 * 1000;
  
  leafClassProb.push_back(0.25);
  leafClassIdleTime.push_back(30 * 1000);
  leafClassOfflineProb.push_back(0.1);

  leafClassProb.push_back(0.75);
  leafClassIdleTime.push_back(300 * 1000);
  leafClassOfflineProb.push_back(0.2);

  for (int i = 0; i < NUMLEAVES; i++) {
    leafStates.push_back(-1);
  }
  for (int i = 0; i < NUMLEAVES; i++) {
    leafInsertEpoch.push_back(0);
  }
  for (int i = 0; i < NUMLEAVES; i++) {
    leafInsertQueue.push_back(vector<long>());
  }
  for (int i = 0; i < NUMLEAVES; i++) {
    leafInsertGenerated.push_back(false);
  }
  epoch = 0;

  if (ip == 1) {
    delaycb(0, &Arpeggio::printTime, (void *)0);
  }
}

bool
operator==(const Metadata & x, const Metadata & y)
{
  return x.filename == y.filename &&
    x.keywordsAsString() == y.keywordsAsString();
}

template<class BT, class AT, class RT> 
bool Arpeggio::replicaRPC(const string & indexname,
                          void (BT::* fn)(AT *, RT *),
                          AT *args, RT *ret, u_long type, u_long size,
                          Time timeout, int numReplicas)
{
  if (numReplicas == 0) {
    numReplicas = this->numReplicas;
  }
  
  lookup_args *a = New lookup_args;
  a->key = ConsistentHash::ipname2chid(indexname.c_str());
  a->start = now();
  a->latency = 0;
  a->num_to = 0;
  a->total_to = 0;
  a->retrytimes = 0;
  a->hops = 0;
  vector<IDMap> succs = lookup_internal(a);

  if (succs.size() == 0) {
    return false;
  }
  
  int replica = rand() % numReplicas;
  if (replica >= succs.size()) {
    replica = 0;
  }
  IPAddress ip = succs[replica].ip;

  record_stat(me.ip, ip, type, 0, size);
  
  return doRPC(ip, fn, args, ret, timeout);
}

void
Arpeggio::lookup(Args *args)
{
  ARPDEBUG(1) << "lookup: " << (*args)["key"] << endl;
  
  query_args qa;
  query_result qr;
  vector<string> indexKeywords;
  
  qa.keywords = tokenize((*args)["query"], " ");
  sort(qa.keywords.begin(), qa.keywords.end());

  
  if (qa.keywords.size() > kssK) {
    random_sample_n(qa.keywords.begin(), qa.keywords.end(),
                    back_insert_iterator<vector<string> >(indexKeywords),
                    kssK);
  } else {
    indexKeywords = qa.keywords;
  }

  qa.indexName = Metadata::keywordsVectorToString(indexKeywords);

  u_long rpcsize = qa.indexName.size() + 1 + (*args)["query"].size()+1; 
  
  if (!replicaRPC(qa.indexName, &Arpeggio::remoteQuery, &qa, &qr, 25,
                  rpcsize)) {
    return;
  }
  u_long ressize = 0;
  ARPDEBUG(1) << "lookup: " << qa.indexName << " " << qr.results.size() << " results" << endl;

  for (vector<Metadata>::iterator it = qr.results.begin();
       it != qr.results.end(); it++) {
    ressize += it->bwcost();
    
    ARPDEBUG(1) << "lookup: " << qa.indexName << " " << it->filename << endl;
  }
  record_stat(qr.source.ip, me.ip, 25, 0, ressize);
}


void
Arpeggio::lookupAndFree(Args *args)
{
  lookup(args);
  delete args;
}


void
Arpeggio::join(Args *args)
{
  ARPDEBUG(1) << "join: " << endl;
  ChordFingerPNS::join(args);

  allArpeggioIDs.insert(me.id);
  allArpeggioIDsToIDMap[me.id] = me;

  if (!syncRunning) {
    delaycb(syncTimer, &Arpeggio::rescheduleSynchronize, (void *) 0);
  }

  fill(leafStates.begin(), leafStates.end(), -1);
  fill(leafInsertEpoch.begin(), leafInsertEpoch.end(), 0);
  for (vector<vector<long> >::iterator it = leafInsertQueue.begin();
         it != leafInsertQueue.end(); it++) {
    it->clear();
  }
  epoch++;

  for (int i = 0; i < leafStates.size(); i++) {
      delaycb(next_exponential(leafOfflineTime),
              &Arpeggio::simulateLeaf, make_pair(i, epoch));
  }
}


void
Arpeggio::crash(Args *args)
{
  ARPDEBUG(1) << "crash: " << endl;
  ChordFingerPNS::crash(args);

  allArpeggioIDs.erase(me.id);
}


void
Arpeggio::insert(Args *args)
{  
  ARPDEBUG(1) << "insert: " << (*args)["key"] << endl;

  Metadata md;
  md.filename = (*args)["filename"];
  md.keywords = tokenize((*args)["keywords"], " ");
  recursiveAddWithIG(md);
}


void
Arpeggio::insertAndFree(Args *args)
{
  insert(args);
  delete args;
}


void
Arpeggio::remoteQuery(query_args *a, query_result *r)
{
  ARPDEBUG(1) << "Received query" << endl;

  r->source = me;
  sort(a->keywords.begin(), a->keywords.end());

  ARPDEBUG(1) << "Searching index " << a->indexName << endl;
  
  pair<hash_multimap<string, Metadata>::iterator,
    hash_multimap<string, Metadata>::iterator> its =
    mdIndex.equal_range(a->indexName);
  
  for (hash_multimap<string, Metadata>::iterator it = its.first;
       it != its.second; it++) {
    r->results.push_back(it->second);
  }
}


void
Arpeggio::remoteInsert(insert_args *a, insert_result *r)
{
  ARPDEBUG(1) << "Received insert" << endl;
  addMetadataToIndexes(a->entry);
  
  delete a;
  delete r;
}


ConsistentHash::CHID
Arpeggio::replicatedKeyspace(ConsistentHash::CHID id, int numReplicas)
{
//  ARPDEBUG(1) << "finding replicated keyspace for " << id << endl;
  vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_HEALTHY);
//  vector<IDMap> preds = loctable->preds(me.id-1, _nsucc, LOC_HEALTHY);
  // Oh, hell, that doesn't work, let's just cheat.
  vector<CHID> preds = getPreds(me.id, _nsucc);
  deque<CHID> predsandsuccs;
  
//   for (vector<IDMap>::reverse_iterator it = preds.rbegin();
//        it != preds.rend(); it++) {
//     if (find(succs.begin(), succs.end(), *it) == succs.end()) {
//       if (seen.find(it->id) == seen.end()) {
//         seen.insert(it->id);
//         predsandsuccs.push_back(it->id);
//       }      
//     }
//   }
//   predsandsuccs.push_back(me);
//   for (vector<IDMap>::iterator it = succs.begin(); it != succs.end();
//        it++) {
//     if (seen.find(it->id) == seen.end()) {
//       seen.insert(it->id);
//       predsandsuccs.push_back(*it);
//     }
//   }

  predsandsuccs.push_back(me.id);

  for (vector<IDMap>::iterator it = succs.begin(); it != succs.end();
       it++) {
      if (find(predsandsuccs.begin(), predsandsuccs.end(), it->id) ==
          predsandsuccs.end()) {
        predsandsuccs.push_back(it->id);
      }
  }

  for (vector<CHID>::iterator it = preds.begin();
       it != preds.end(); it++) {
      if (find(predsandsuccs.begin(), predsandsuccs.end(), *it) ==
          predsandsuccs.end()) {
          predsandsuccs.push_front(*it);
      }
  }
  
  int ind;
  bool found = false;

//   for (ind = 0; ind < predsandsuccs.size(); ind++) {
//     ARPDEBUG(3) << "    " << predsandsuccs[ind] << endl;
//   }

  
  
  for (ind = 0; ind < predsandsuccs.size(); ind++) {
    if (predsandsuccs[ind] == id) {
      found = true;
      break;
    }
  }
  if (!found) {
    ARPDEBUG(1) << "can't find replicated keyspace for " << id << ": not in pred/succ list!" << endl;
  }

  if (numReplicas > predsandsuccs.size()) {
    return id;
  } else if (ind - numReplicas > 0) {
    return predsandsuccs[ind-numReplicas];
  } else {
    return predsandsuccs[(ind-numReplicas) % predsandsuccs.size()];
  }
}


vector<string>
Arpeggio::replicatedIndexes(ConsistentHash::CHID id, int numReplicas)
{
  ConsistentHash::CHID b = id;
  ConsistentHash::CHID a = replicatedKeyspace(id, numReplicas);
  vector<string> indexNames;
//  ARPDEBUG(3) << "keyspace is " << a << "  " << b << endl;
  for (hash_map<ConsistentHash::CHID, string>::iterator it =
         keywordsHashMap.begin(); it != keywordsHashMap.end(); it++) {
    if (ConsistentHash::betweenrightincl(a, b, it->first)) {
      indexNames.push_back(it->second);
    }
  }

  return indexNames;
}


void
Arpeggio::replSendOfferBlocks(IDMap node)
{
  vector<string> indexes = replicatedIndexes(node.id, numReplicas);
  offerBlocks_args *ofa = New offerBlocks_args();
  offerBlocks_result *ofr = New offerBlocks_result();
  hash<Metadata> h;

  ofa->source = me;
  
  for (vector<string>::iterator it = indexes.begin();
       it != indexes.end(); ++it) {
    pair<hash_multimap<string, Metadata>::iterator,
      hash_multimap<string, Metadata>::iterator> its =
      mdIndex.equal_range(*it);
    
    for (hash_multimap<string, Metadata>::iterator it2 = its.first;
         it2 != its.second; it2++) {
      ofa->hashes.insert(h(it2->second));
    }
  }
  
  ARPDEBUG(3) << "Offering " << ofa->hashes.size() << " blocks to " <<
    node.ip << endl;

  if (!ofa->hashes.empty()) {
    record_stat(me.ip, node.ip, 27, 0, ofa->hashes.size()*4);
    asyncRPC(node.ip, &Arpeggio::remoteOfferBlocks, ofa, ofr);
  }
}


void
Arpeggio::remoteOfferBlocks(offerBlocks_args *a, offerBlocks_result *r)
{
  ARPDEBUG(3) << "Received offer of " << a->hashes.size()
           << " blocks from " << a->source.ip << endl;
  replGetMissingBlocks(a->source, a->hashes);
  delete a;
  delete r;
}


void
Arpeggio::replGetMissingBlocks(const IDMap & remote,
                               const set<size_t> & blockIDs)
{
  requestBlocks_args ra;
  requestBlocks_result rr;
  
  for (set<size_t>::const_iterator it = blockIDs.begin();
       it != blockIDs.end(); it++) {
    hash_map<size_t, Metadata>::iterator mapit;
    mapit = metadataHashMap.find(*it);
    if (mapit == metadataHashMap.end()) {
      ra.hashes.push_back(*it);
    }
  }

  ARPDEBUG(3) << "Getting " << ra.hashes.size() << " blocks from " <<
    remote.ip << endl;

  record_stat(me.ip, remote.ip, 27, 0, ra.hashes.size()*4);
  if (!doRPC(remote.ip, &Arpeggio::remoteRequestBlocks, &ra, &rr)) {
    return;
  }

  ARPDEBUG(3) << "Got " << rr.entries.size() << " blocks from " <<
    remote.ip << endl;

  u_int ressize=0;
  
  for (vector<Metadata>::iterator it = rr.entries.begin();
       it != rr.entries.end(); it++) {
    addMetadataToIndexes(*it);
    ressize += it->bwcost();
  }
  record_stat(remote.ip, me.ip, 27, 0, ressize);

}


void
Arpeggio::remoteRequestBlocks(requestBlocks_args *a,
                              requestBlocks_result *r)
{
  for (vector<size_t>::iterator it = a->hashes.begin();
       it != a->hashes.end(); it++) {
    r->entries.push_back(metadataHashMap[*it]);
  }
}


void
Arpeggio::replOfferToAll()
{
  vector<IDMap> succs = loctable->succs(me.id+1, numReplicas, LOC_HEALTHY);
  vector<CHID> preds = getPreds(me.id, numReplicas);

  for (vector<IDMap>::iterator it = succs.begin(); it != succs.end();
       it++) {
    replSendOfferBlocks(*it);
  }
  
  for (vector<CHID>::iterator it = preds.begin(); it != preds.end();
       it++) {
    replSendOfferBlocks(allArpeggioIDsToIDMap[*it]);
  }
}


void
Arpeggio::rescheduleSynchronize(void *)
{
  if (!alive()) {
    syncRunning = false;
    return;
  }

  syncRunning = true;
  
  if (syncOutstanding > 0) {
    assert(0);
  } else {
    syncOutstanding++;
    replOfferToAll();
    syncOutstanding--;
    assert(syncOutstanding == 0);
  }
  delaycb(syncTimer, &Arpeggio::rescheduleSynchronize, (void *) 0);
}


template<class T>
struct indexInto
{
  const vector<T> & vec;
  
  indexInto(const vector<T> & v) : vec(v)
  {
  }

  T operator()(int i)
  {
    return vec[i];
  }
};


vector<vector<string> >
Arpeggio::generateKeywordSets(const vector<string> & keywords, int K)
{
  vector<vector<string> > ret;
  vector<string> tmp;

  int ksize = keywords.size();
  if (ksize > MAX_KEYWORDS) {
    ksize = MAX_KEYWORDS;
  }
  
  
  if (K > keywords.size()) {
    K = keywords.size();
  }

  for (int k = 1; k <= K; k++) {
    vector<int> ind(k);
    iota(ind.begin(), ind.end(), 0);
    
    // yield set
    tmp.clear();
    transform(ind.begin(), ind.end(),
              back_insert_iterator<vector<string> >(tmp),
              indexInto<string>(keywords));
    ret.push_back(tmp);

    bool done = false;
    while (!done) {
      // Find next possible set of unique indices
      for (int i = k-1; i >= 0; i--) {
        ind[i] ++;
        if (ind[i] == ksize) {
          ind[i] = 0;
          if (i == 0) {
            done = true;
          }
        } else {
          break;
        }
      }

      if (done) {
        break;
      }

      if (! is_sorted(ind.begin(), ind.end())) {
        continue;
      }

      if (unique(ind.begin(), ind.end()) != ind.end()) {
        continue;               // not unique
      }

      tmp.clear();
      transform(ind.begin(), ind.end(),
                back_insert_iterator<vector<string> >(tmp),
                indexInto<string>(keywords));
      ret.push_back(tmp);
    }
  }
  return ret;
}


void
Arpeggio::testKSS()
{
  vector<string> test;
  test.push_back("foo");
  test.push_back("bar");
  test.push_back("baz");
  test.push_back("quux");
  
  cout << "testing" << endl;
  vector<vector<string> > test2 = generateKeywordSets(test, 3);
  cout << "done" << endl;
  
  for (vector<vector<string> >::iterator it = test2.begin();
       it != test2.end(); it++) {
    for (vector<string>::iterator it2 = it->begin(); it2 != it->end(); it2++) {
      cout << *it2 << " ";
    }
    cout << endl;
  }

}


void
Arpeggio::addMetadataToIndexes(Metadata md)
{
  ARPDEBUG(3) << "adding block " << md.filename << endl;
  
  hash<Metadata> h;

  sort(md.keywords.begin(), md.keywords.end());

  if (metadataHashMap.find(h(md)) != metadataHashMap.end()) {
    ARPDEBUG(1) << "insertMetadata: already have block" << endl;
    return;
  }
  metadataHashMap[h(md)] = md;

  CHID a = replicatedKeyspace(me.id, numReplicas);
  CHID b = me.id;

  ARPDEBUG(3) << "a = " << a << ", b = " << b << endl;
  
  // Figure out the index names
  vector<vector<string> > kssVectors = generateKeywordSets(md.keywords,
                                                           kssK);
  vector<string> indexNames;
  for (vector<vector<string> >::iterator it = kssVectors.begin();
       it != kssVectors.end(); it++) {
    indexNames.push_back(Metadata::keywordsVectorToString(*it));
  }

  for (vector<string>::iterator it = indexNames.begin();
       it != indexNames.end(); it++) {
    if (ConsistentHash::betweenrightincl(
          a, b,
          ConsistentHash::ipname2chid(it->c_str()))) {
      ARPDEBUG(2) << "adding block to index " << *it << endl;
      mdIndex.insert(make_pair(*it, md));
      keywordsHashMap[ConsistentHash::ipname2chid(it->c_str())] =
        *it;
    }
  }
}


void
Arpeggio::findAndAddToIndex(const string & indexName,
                            const Metadata & md)
{
  delaycb(0, &Arpeggio::findAndAddToIndexInternal,
          make_pair(indexName, md));
}


void
Arpeggio::findAndAddToIndexInternal(pair<string, Metadata> args)
{
  const string & indexName = args.first;
  const Metadata & md = args.second;

  insert_args *ia = New insert_args();
  insert_result *ir = New insert_result();

  ia->entry = md;
  replicaRPC(indexName, &Arpeggio::remoteInsert, ia, ir, 26,
             indexName.size()+1+md.bwcost());
}


void
Arpeggio::recursiveAdd(const Metadata & md)
{
  // Figure out the index names
  vector<vector<string> > kssVectors = generateKeywordSets(md.keywords,
                                                           kssK);
  for (vector<vector<string> >::iterator it = kssVectors.begin();
       it != kssVectors.end(); it++) {
    findAndAddToIndex(Metadata::keywordsVectorToString(*it), md);
  }
}


void
Arpeggio::recursiveAddWithIG(const Metadata & md)
{
  gatewayInsert_args ia;
  gatewayInsert_result ir;

  ia.entry = md;
  if (!replicaRPC(md.filename,
                  &Arpeggio::remoteGatewayInsert, &ia, &ir, 26, md.bwcost(),
                  10000, 1)) {
    // gateway insert failed
    recursiveAdd(md);
  }
}


void
Arpeggio::remoteGatewayInsert(gatewayInsert_args *a,
                              gatewayInsert_result *r)
{
  if (gatewaySeen.find(a->entry.filename) == gatewaySeen.end()) {
    // Haven't seen it before
    ARPDEBUG(2) << "IG: adding " << a->entry.filename << endl;
    gatewaySeen.insert(a->entry.filename);
    recursiveAdd(a->entry);
  }
}


// Borrowed from http://www.digitalpeer.com/id/simple
vector<string>
Arpeggio::tokenize(const string& str,const string& delimiters)
{
  vector<string> tokens;
    	
  // skip delimiters at beginning.
  string::size_type lastPos = str.find_first_not_of(delimiters, 0);
    	
  // find first "non-delimiter".
  string::size_type pos = str.find_first_of(delimiters, lastPos);

  while (string::npos != pos || string::npos != lastPos)
  {
    // found a token, add it to the vector.
    tokens.push_back(str.substr(lastPos, pos - lastPos));
		
    // skip delimiters.  Note the "not_of"
    lastPos = str.find_first_not_of(delimiters, pos);
		
    // find next "non-delimiter"
    pos = str.find_first_of(delimiters, lastPos);
  }

  return tokens;
}


vector<ConsistentHash::CHID>
Arpeggio::getPreds(CHID id, int n)
{
  set<CHID>::iterator it = allArpeggioIDs.lower_bound(id);
  vector<CHID> r;


  for (int i = 0; i < n; i++) {
      if (it == allArpeggioIDs.begin()) {
          it = allArpeggioIDs.end();
      }
      it--;
      r.push_back(*it);
  }

  return r;
}


void
Arpeggio::simulateLeaf(pair<int, int> args)
{
  int leafNum = args.first;
  int epochNum = args.second;

  if (epochNum != epoch || !alive()) {
    return;
  }

  if (leafStates[leafNum] == -1) {
    // Leaf is currently offline. Bring it back online.
    ARPDEBUG(2) << "leaf " << leafNum << " online" << endl;
    double r = ( (double)random() / (double)(RAND_MAX) );
    for (int i = 0; i < leafClassProb.size(); i++) {
      if (r <= leafClassProb[i]) {
        leafStates[leafNum] = i;
        break;
      } else {
        r -= leafClassProb[i];
      }
    }

    leafInsertGenerated[leafNum] = false;
    if (leafStates[leafNum] != 0) {
      delaycb(INSERT_INIT_DELAY, &Arpeggio::simulateLeafInsert,
              make_pair(leafNum,
                        make_pair(epoch,
                                  leafInsertEpoch[leafNum])));
    }
  } else {
    // Do a query
    ARPDEBUG(2) << "leaf " << leafNum << " querying" << endl;
    Args *a = New Args();
    string queryStr = getRandomQuery();
    vector<string> queryToks = tokenize(queryStr, "\t");
    
    (*a)["query"] = queryToks[0];
    (*a)["fullquery"] = queryToks[1];
    
    delaycb(0, &Arpeggio::lookupAndFree, a);
    
    double r = ( (double)random() / (double)(RAND_MAX) );
    if (r <= leafClassOfflineProb[leafStates[leafNum]]) {
      // Leaf going offline
      leafStates[leafNum] = -1;
      ARPDEBUG(2) << "leaf " << leafNum << " offline" << endl;
      leafInsertEpoch[leafNum]++;
      leafInsertQueue[leafNum].clear();
    }
  }

  // Schedule the next appropriate event
  Time delay;
  if (leafStates[leafNum] == -1) {
    delay = next_exponential(leafOfflineTime);
  } else {
    delay = next_exponential(leafClassIdleTime[leafStates[leafNum]]);
  }

  ARPDEBUG(2) << "delay = " << delay << endl;
  
  delaycb(delay, &Arpeggio::simulateLeaf, make_pair(leafNum, epoch));
}

void
Arpeggio::simulateLeafInsert(pair<int, pair<int, int> > args)
{
  int leafNum = args.first;
  int epochNum = args.second.first;
  int leafEpochNum = args.second.second;

  if (epochNum != epoch || !alive() ||
      leafInsertEpoch[leafNum] != leafEpochNum) {
    return;
  }

  if (!leafInsertGenerated[leafNum]) {
    int n = numberOfFilesForHost();
    ARPDEBUG(2) << "leaf has " << n << " files" << endl;
    leafInsertQueue[leafNum] = fileRanksForHost(n);
  }

  
  if (leafInsertQueue[leafNum].empty()) {
    return;
  }

  vector<long>::iterator it = leafInsertQueue[leafNum].begin();

  string s = getFilename(*it);

  leafInsertQueue[leafNum].erase(it);
  delaycb(INSERT_DELAY, &Arpeggio::simulateLeafInsert,
          make_pair(leafNum, make_pair(epochNum, leafEpochNum)));
  
  vector<string> toks = tokenize(s, "\t");
  Args *a = New Args();
  (*a)["keywords"] = toks[0];
  (*a)["filename"] = toks[1];
  insertAndFree(a);
}

Time
Arpeggio::next_exponential( u_int mean )
{

  assert( mean > 0 );

  double x = ( (double)random() / (double)(RAND_MAX) );
  u_int rt = (u_int) ((-(mean*1.0))*log( 1 - x ));
  return (Time) rt;
}

double
Arpeggio::pareto(double a, u_int b)
{
  double x = ( (double)random() / (double)(RAND_MAX) );
  double xx = exp(log(1 - x)/a);
  double rt = ((double)b/xx);
  return rt;
}



string
Arpeggio::getRandomQuery()
{
  
  if (queryFile == NULL) {
    queryFile = fopen(QUERYFILE, "r");
  }

  char buf[4096];

//   fseek(queryFile, 0, SEEK_END);
//   long len = ftell(queryFile);
//   len -= 100000;
//   fseek(queryFile, random() % len, SEEK_SET);
//   int n = random() % 5 + 2;
//   for (int i = 0; i < n; i++) {
//     fgets(buf, sizeof(buf), queryFile);
//   }

  fgets(buf, sizeof(buf), queryFile);
  if (buf[strlen(buf)-1] == '\n') {
    buf[strlen(buf)-1] = '\0';
  }
  if (buf[strlen(buf)-1] == '\r') {
    buf[strlen(buf)-1] = '\0';
  }

  return string(buf);
}


string
Arpeggio::getFilename(int i)
{
  if (resultsFile == NULL) {
    resultsFile = fopen(RESULTSFILE, "r");
    if (resultsFile == NULL) {
      perror("fopen");
      assert(0);
    }
  }

  i %= NUMRESULTS;
  
  char buf[2048];
  fseek(resultsFile, 1024*i, SEEK_SET);

  fgets(buf, sizeof(buf), resultsFile);
  if (buf[strlen(buf)-1] == '\n') {
    buf[strlen(buf)-1] = '\0';
  }
  if (buf[strlen(buf)-1] == '\r') {
    buf[strlen(buf)-1] = '\0';
  }

  return string(buf);  
}

int
Arpeggio::numberOfFilesForHost()
{
  double r = ( (double)random() / (double)(RAND_MAX) );
  if (r > 0.93) {
//    return random() % 10000 + 1000;
    return 1000;
  } else {
    r = ( (double)random() / (double)(RAND_MAX) );
    r *= 3;
    r = pow(10,r) + 1;
    return (int) r;
  }
}

/*
set<long>
Arpeggio::fileRanksForHost(int n)
{
  set<long> s;
  long x;
  
  for (int i = 0; i < n; i++) {
    x = (long) pareto(0.5, 100);
    if (s.find(x) == s.end()) {
      s.insert(x);
    } else {
      set<long>::iterator it = s.find(x);
      bool found = false;
      while (1) {
        x++;
        if (++it == s.end()) {
          s.insert(x);
          break;
        }
        if (x != *it) {
          s.insert(x);
          break;
        }
      }
    }
  }

  return s;
}
*/

vector<long>
Arpeggio::fileRanksForHost(int n)
{
  hash_set<long> s;
  long x;
  vector<long> r;
  
  for (int i = 0; i < n; i++) {
    //x = (long) pareto(0.5, 100);
    while (s.find(x) != s.end()) {
      x = random() % NUMRESULTS;
    } 
    s.insert(x);
    r.push_back(x);
  }

  random_shuffle(r.begin(), r.end());
  
  
  return r;
}
