package traces;

import anastore.client.*;
import anastore.net.*;
import anastore.store.NoSuchDatum;
import anastore.util.*;
import anastore.server.Server;

import static traces.Auspex.Type.*;
import static traces.Auspex.Action.*;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ThreadMXBean;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.nio.*;

/**
 * A class that runs Auspex traces (for now) 
 */
public class TraceRunner
{
    private static final Log log = new Log();
    private static final Log info = new Log();

    //total number of trace records to run
    private final int NUM_TRACES;
    //indicates whether transactions are open/close or per operation
    private final boolean OPEN_CLOSE_TRANSACTIONS;
    private final boolean RETRY_ABORTED;
    private final boolean DONT_OPTIMIZE;
    private final double FRESHNESS;

    private static final int BACKOFF_MILLIS = 64;
    private static final Random RANDOM = new Random();

    private static final ThreadMXBean threadMXBean =
        java.lang.management.ManagementFactory.getThreadMXBean();

    /** The size of a file data block */
    private static final int DATA_BLOCK_SIZE = 4096;
    /** The number of direct pointers in an inode */
    private static final int INODE_DIRECT_ENTRIES = 12;
    /** The number of direct pointers in an indirect block */
    private static final int INODE_INDIRECT_ENTRIES = DATA_BLOCK_SIZE/8;
    /** The size of a metadata block */
    private static final int INODE_SIZE = 4+INODE_DIRECT_ENTRIES*8+2*8;

    private final File _traceFile;
       
    //server stuff
    private static final String SERVER_HOST = "localhost";
    private static final int SERVER_PORT = 3772;

    private final TraceServer _server;
    
    //list of all of the clients and their threads
    private final HashMap<Integer,TraceClient> _clients;
    private ThreadGroup _clientThreads;
    //keep track of the metadata block id for each file
    private final ConcurrentMap<Long,Long> _files;
    //pool of trace records to be reused for passing to clients
    private final LinkedBlockingQueue<Auspex.Record> _recordPool;

    private final StreamStatistics _stats = new StreamStatistics();
    private final String _statsName;
    private final AtomicInteger _nROTransactions = new AtomicInteger();
    private final AtomicInteger _nRWTransactions = new AtomicInteger();
    private final AtomicInteger _nAborts = new AtomicInteger();

    /**
     * Construct a TraceRunner that sets up every thing to run a
     * trace. Create the thread that runs the server and all of the
     * clients that are in the trace. Creates dummy files on the
     * server for the files that are read during the trace without
     * being first created by the trace.
     * @param trace The trace file for initializing the server state
     */
    private TraceRunner(File traceFile, int numTraces, boolean openClose,
                        boolean retryAborted, boolean dontOptimize,
                        double freshness, String statsName)
        throws IOException {
        NUM_TRACES = numTraces;
        OPEN_CLOSE_TRANSACTIONS = openClose;
        RETRY_ABORTED = retryAborted;
        DONT_OPTIMIZE = dontOptimize;
        FRESHNESS = freshness;
        _statsName = statsName;
        
        //initialize shared data structures
        _traceFile = traceFile;
        _clients = new HashMap<Integer,TraceClient>();
        _files = new ConcurrentHashMap<Long,Long>();
        _recordPool = new LinkedBlockingQueue<Auspex.Record>();

        //make and start a thread for the server
        _server = new TraceServer(SERVER_PORT);
        _server.start();
                
        //setup for the trace
        Auspex trace = new Auspex(traceFile);
        Auspex.Record rec = new Auspex.Record();
        
        //create a client to use to make initial files
        ChannelClient chanClient = new ChannelClient(SERVER_HOST,SERVER_PORT);
        Pair<SendChannel, ReceiveChannel> channels = null;
        while (channels == null) {
            try {
                channels = chanClient.connect();
            } catch (java.net.ConnectException e) {
                try {
                    info.println("Waiting for server...");
                    Thread.sleep(200);      // XXX
                } catch (InterruptedException e2) {
                }
            }
        }
        SendChannel send = channels.first;
        ReceiveChannel recv = channels.second;
        File cache = new File("/tmp/anastore-cache/init");
        BlockStore store = BlockStore.connect(send, cache);
       
        //all fids opened in the trace
        Set<Long> filesOpened = new HashSet<Long>();
        //all fids opened read or read write first and need to be
        //created before the trace is run and their sizes
        Map<Long,Integer> filesInit = new HashMap<Long,Integer>();
        
        //figure out which files and meta_data blocks need to be
        //created and start clients
        _clientThreads = new ThreadGroup("Clients");
        for(int i = 0; i < NUM_TRACES; i++){
            //get next record
            try {
                trace.next(rec);
            } catch (NoSuchElementException e) {
                break;
            }
                      
            Long fid = new Long(rec.fid);
            
            //check if its a new client
            if (!_clients.containsKey(rec.host)){
                TraceClient c = new TraceClient(rec.host, _clientThreads);
                _clients.put(rec.host, c);
            }

            // If this is an OPEN, it may be evidence of a file's
            // existence before the trace began
            if (rec.type == OPEN) {
                // Is this the first OPEN we've seen for this file?
                if (filesOpened.add(fid)) {
                    // If the first OPEN is a read or a read/write, it
                    // is evidence of the file's existence and size.
                    // Likewise, if there's already an entry in
                    // filesInit, it indicates there was an ATTR, in
                    // which case we already know the file existed, so
                    // we should update its size.
                    if (rec.action != WRITE ||
                        filesInit.containsKey(fid)) {
                        filesInit.put(fid, rec.size);
                    }
                }
            } else if (rec.type == ATTR || rec.type == DIR ||
                       rec.type == DELETE) {
                // If this file has not been opened before, this is
                // evidence of its existence.  For ATTR, this does not
                // tell its size.  If we later encounter an open,
                // we'll update the size.
                if (!filesOpened.contains(fid))
                    filesInit.put(fid, rec.size);
            }
        }

        //create meta data blocks and initial files
        RWSnapshot snap = store.beginRW();
        try {
            // Commit periodically so we don't wind up with a HUGE
            // transaction
            int blocksSinceCommit = 0;

            int total = filesInit.size(), cur = 0;
            for(Map.Entry<Long, Integer> entry : filesInit.entrySet()) {
                ++cur;
                info.println("Creating file " + cur + " of " + total);

                long fid = entry.getKey();
                int size = entry.getValue();

                Pair<Long, ByteBuffer> metaBlock;
                Long metaBid;
                ByteBuffer metaBuf;

                //create meta data block
                metaBlock = snap.newBlock(INODE_SIZE);
                ++blocksSinceCommit;
                metaBid = metaBlock.first;
                metaBuf = metaBlock.second;
                //remember the block id of the meta data block for this file
                _files.put(fid,metaBid);
                //create the file contents
                blocksSinceCommit += extendFile(snap, fid, size);

                if (blocksSinceCommit > 500) {
                    snap.commit();
                    snap = store.beginRW();
                    blocksSinceCommit = 0;
                }
            }
            snap.commit();
        } catch (AbortedException e) {
            // Shouldn't happen.  There are no contenders
            throw new Bug(e);
        }
    }

    private int extendFile(RWSnapshot snap, long fid, int bytes)
        throws AbortedException
    {
        int needBlocks = (bytes+DATA_BLOCK_SIZE-1)/DATA_BLOCK_SIZE;
        long mbid = _files.get(fid);

        // Is it big enough already?
        ByteBuffer inode = snap.readBlock(mbid);
        int curBlocks = inode.getInt(0);
        if (curBlocks >= needBlocks)
            return 0;

        // Extend it!
        inode = snap.readWriteBlock(mbid);
        log.println("Extending " + Long.toHexString(fid) +
                    " from " + curBlocks + " to " + needBlocks + " blocks");
        inode.putInt(0, needBlocks);
        for (int i = curBlocks; i < needBlocks; ++i) {
            // Create the data block
            long bid = snap.newBlock(DATA_BLOCK_SIZE).first;
            // Insert it into the inode
            int[] r = route(i);
            ByteBuffer buf = inode;
            for (int j = 0; j < r.length - 1; ++j) {
                long nextid = buf.getLong(r[j]);
                if (nextid == 0) {
                    nextid = snap.newBlock(DATA_BLOCK_SIZE).first;
                    buf.putLong(r[j], nextid);
                }
                buf = snap.readWriteBlock(nextid);
            }
            buf.putLong(r[r.length-1], bid);
        }
        return needBlocks - curBlocks;
    }

    private static int[] route(int blockNum)
    {
        if (blockNum < INODE_DIRECT_ENTRIES)
            return new int[] { 4 + blockNum*8 };
        else if (blockNum - INODE_DIRECT_ENTRIES < INODE_INDIRECT_ENTRIES)
            return new int[] { 4 + INODE_DIRECT_ENTRIES*8,
                               (blockNum-INODE_DIRECT_ENTRIES)*8 };
        else if (blockNum - INODE_DIRECT_ENTRIES - INODE_DIRECT_ENTRIES <
                 INODE_INDIRECT_ENTRIES*INODE_INDIRECT_ENTRIES)
            return new int[] { 4 + INODE_DIRECT_ENTRIES*8 + 8,
                               ((blockNum-INODE_DIRECT_ENTRIES-INODE_INDIRECT_ENTRIES)/
                                INODE_INDIRECT_ENTRIES)*8,
                               ((blockNum-INODE_DIRECT_ENTRIES-INODE_INDIRECT_ENTRIES)%
                                INODE_INDIRECT_ENTRIES)*8 };
        else
            throw new IllegalArgumentException
                ("Block number " + blockNum + " too large");
    }

    private static void testRoute()
    {
        for (int i = 0; i < 2048; ++i) {
            int[] r = route(i);
            for (int j = 0; j < r.length; ++j)
                System.out.print(r[j] + " ");
            System.out.println("");
        }
    }

    private static long getThreadGroupCpuTime(ThreadGroup g)
    {
        Thread[] threads = new Thread[500];
        int nThreads = g.enumerate(threads, true);
        if (nThreads == threads.length)
            throw new Bug("Too many threads");

        long total = 0;
        for (int i = 0; i < nThreads; ++i) {
            total += threadMXBean.getThreadCpuTime(threads[i].getId());
        }
        return total;
    }

    private static class TraceTimeSource implements TimeSource
    {
        private double _first = Double.NaN;
        private double _now;

        public void setTime(double now)
        {
            if (Double.isNaN(_first)) {
                _first = _now = now;
            } else {
                assert now >= _now;
                _now = now;
            }
        }

        public double currentTime()
        {
            assert !Double.isNaN(_first);
            return _now - _first;
        }
    }

    /**
     * A client that takes trace records from a master thread and
     * simulates the action. Clients can run either one transaction
     * per trace record operation or open/close transactions.
     */
    private class TraceClient extends Thread 
    {
        private final int _cid;
        
        private ChannelClient _chanClient;
        private BlockStore _store;
        private final TraceTimeSource _clock = new TraceTimeSource();
        public final ArrayBlockingQueue<Auspex.Record> _q;
        private final HashMap<Long,Snapshot> _transactions;
        private final Map<Long, List<Auspex.Record>> _sequences =
            new HashMap<Long, List<Auspex.Record>>();
        private final Map<Long, Integer> _backoff =
            new HashMap<Long, Integer>();

        /**
         * Constructs a client
         * @param cid client/host id from the trace file
         */
        public TraceClient(int cid, ThreadGroup cgrp)
            throws IOException
        {
            super(new ThreadGroup(cgrp, "Host " + cid), "Replayer " + cid);
            _cid = cid;
            
            _q = new ArrayBlockingQueue<Auspex.Record>(1);
            _transactions = new HashMap<Long,Snapshot>();
        }
        
        /**
         * Run trace operations. Constantly watches a queue for new
         * records, then simulates their operation.
         */
        public void run()
        {
            Log.setThreadLabel("h" + _cid);
            // start up the client
            try {
                _chanClient = new ChannelClient(SERVER_HOST,SERVER_PORT);
                Pair<SendChannel, ReceiveChannel> channels = _chanClient.connect(_stats);
                SendChannel send = channels.first;
                ReceiveChannel recv = channels.second;
                File cache = new File("/tmp/anastore-cache/cache-"+_cid);
                _store = BlockStore.connect(send, cache,
                                            FRESHNESS,
                                            _clock);
            } catch (IOException e) {
                throw new Bug("Could not start client", e);
            }
                        
            Auspex.Record cur;
            
            while(true) {
                //grab new record from queue
                try {
                    cur = _q.take();
                } catch (InterruptedException e) {
                    throw new Bug("Thread interrupted!");
                }

                if (cur.type != EOT) {
                    log.println("Replaying " + cur);
                    assert cur.host == _cid;
                    _clock.setTime(cur.timestamp);
                } else {
                    return;
                }

                boolean success;
                do {
                    success = true;
                    try {
                        process(cur);
                    } catch (NoSuchDatum e) {
                        info.println("Unexpected NoSuchDatum");
                        e.printStackTrace();
                    } catch (AbortedException e) {
                        _nAborts.getAndIncrement();
                        if (RETRY_ABORTED) {
                            switch (cur.type) {
                            case ATTR:
                            case DIR:
                            case DELETE:
                                {
                                    // Retry just this record
                                    success = false;
                                    int backoff = RANDOM.nextInt(BACKOFF_MILLIS);
                                    info.println("Aborted transaction, backing off " +
                                                 backoff + " millis");
                                    try {
                                        Thread.sleep(backoff);
                                    } catch (InterruptedException ie) {
                                        // Ignore
                                    }
                                }
                                break;
                            case OPEN:
                            case BLOCK:
                            case CLOSE:
                                if (OPEN_CLOSE_TRANSACTIONS) {
                                    Integer backoff = _backoff.get(cur.fid);
                                    if (backoff == null)
                                        backoff = BACKOFF_MILLIS;
                                    _backoff.put(cur.fid, backoff*2);
                                    backoff += RANDOM.nextInt(backoff);
                                    info.println("Aborted transaction, backing off " +
                                                 backoff + " millis");
                                    try {
                                        Thread.sleep(backoff);
                                    } catch (InterruptedException ie) {
                                        // Ignore
                                    }
                                    // Retry all of the records
                                    // leading up to this
                                    List<Auspex.Record> seq = _sequences.get(cur.fid);
                                    do {
                                        success = true;
                                        _transactions.remove(cur.fid);
                                        log.println("Retrying " + seq.size() +
                                                    " operations");
                                        for (Auspex.Record r : seq) {
                                            try {
                                                log.println("Retrying " + r);
                                                process(r);
                                            } catch (NoSuchDatum e2) {
                                                info.println("Unexpected NoSuchDatum");
                                                e2.printStackTrace();
                                            } catch (AbortedException e2) {
                                                _nAborts.getAndIncrement();
                                                success = false;
                                                Integer backoff2 = _backoff.get(cur.fid);
                                                if (backoff2 == null)
                                                    backoff2 = BACKOFF_MILLIS;
                                                _backoff.put(cur.fid, backoff2*2);
                                                backoff2 += RANDOM.nextInt(backoff2);
                                                info.println("Re-aborted transaction, backing off " +
                                                             backoff2 + " millis");
                                                try {
                                                    Thread.sleep(backoff2);
                                                } catch (InterruptedException ie) {
                                                    // Ignore
                                                }
                                            }
                                        }
                                    } while (!success);
                                    // Now retry this record
                                    success = false;
                                } else {
                                    // Retry just this record
                                    success = false;
                                }
                                break;
                            case EOT:
                                throw new Bug("Got EOT");
                            }
                        }
                    }
                    if (!success)
                        log.println("Retrying " + cur);
                } while (!success);

                if (RETRY_ABORTED && (cur.type == OPEN ||
                                      cur.type == BLOCK)) {
                    // Add the record to the appropriate sequence
                    List<Auspex.Record> seq = _sequences.get(cur.fid);
                    if (seq == null) {
                        seq = new ArrayList<Auspex.Record>();
                        _sequences.put(cur.fid, seq);
                    }
                    seq.add(cur);
                } else if (RETRY_ABORTED && cur.type == CLOSE) {
                    // Return the entire sequence to the pool
                    try {
                        for (Auspex.Record r : _sequences.remove(cur.fid)) {
                            _recordPool.put(r);
                        }
                    } catch (InterruptedException e) {
                        throw new Bug(e);
                    }
                    _backoff.remove(cur.fid);
                } else {
                    // Return the record to the pool of shared records
                    try {
                        _recordPool.put(cur);
                    } catch (InterruptedException e) {
                        throw new Bug(e);
                    }
                }
            }     
        }

        private void process(Auspex.Record cur)
            throws AbortedException
        {
            //dispatch record
            switch(cur.type){
            case ATTR:
                runMetaDataAccess(cur); 
                break;                    
            case BLOCK:
                runBlock(cur);
                break;
            case DIR:
                runMetaDataAccess(cur);
                break;
            case DELETE:
                runMetaDataAccess(cur);
                break;
            case OPEN:
                runOpen(cur); 
                break;
            case CLOSE:
                runClose(cur);
                break;
            case EOT:
                throw new Bug("Got EOT");
            }
        }

        private void runMetaDataAccess(Auspex.Record rec)
            throws AbortedException
        {
            Long bid = _files.get(rec.fid);
            if (bid == null)
                // XXX Is this allowed if it's a WRITE?
                throw new Bug("Attempt to ATTR non-existent file " +
                              Long.toHexString(rec.fid));

            try {
                if(rec.action == READ && !DONT_OPTIMIZE) {
                    _nROTransactions.getAndIncrement();
                    ROSnapshot snap = _store.beginRO();
                    snap.readBlock(bid);
                    snap.commit();
                } else {
                    _nRWTransactions.getAndIncrement();
                    RWSnapshot snap = _store.beginRW();
                    if (rec.action == WRITE)
                        snap.readWriteBlock(bid);
                    else
                        snap.readBlock(bid);
                    snap.commit();
                }
            } catch (NoSuchDatum e) {
                throw new Bug("Metadata block missing");
            }
        }
        
        private void runOpen(Auspex.Record rec)
            throws AbortedException
        {
            Snapshot snap = null;

            if (OPEN_CLOSE_TRANSACTIONS)
                log.println("Starting " + rec.action +
                            " transaction on " + Long.toHexString(rec.fid));

            switch (rec.action) {
            case READ:
                if (!DONT_OPTIMIZE)
                {
                    _nROTransactions.getAndIncrement();
                    long bid = _files.get(rec.fid);
                    snap = _store.beginRO();
                    snap.readBlock(bid);
                    break;
                }
                // Fall through
            case RW:
            case WRITE:
                {
                    _nRWTransactions.getAndIncrement();
                    RWSnapshot rwsnap = _store.beginRW();
                    snap = rwsnap;
                    if (_files.containsKey(rec.fid)) {
                        long bid = _files.get(rec.fid);
                        rwsnap.readBlock(bid);
                    } else {
                        // Create an empty file
                        info.println("Creating new file " +
                                     Long.toHexString(rec.fid));
                        Pair<Long,ByteBuffer> mdBlock =
                            rwsnap.newBlock(INODE_SIZE);
                        mdBlock.second.putInt(0);
                        _files.put(rec.fid, mdBlock.first);
                    }
                }
                break;
            }

            if (OPEN_CLOSE_TRANSACTIONS) {
                _transactions.put(rec.fid, snap);
            } else {
                snap.commit();
            }
        }

        private void runClose(Auspex.Record rec) 
            throws AbortedException
        {
            if (OPEN_CLOSE_TRANSACTIONS) {
                log.println("Committing transaction on " +
                            Long.toHexString(rec.fid));
                Snapshot snap = _transactions.get(rec.fid);
                if (snap != null) {
                    snap.commit();
                    _transactions.remove(rec.fid);
                } else {
                    info.println("Warning: Close of unopened file " +
                                 Long.toHexString(rec.fid));
                }
            }
        }
        
        private void runDelete(Auspex.Record rec)
            throws AbortedException
        {
            _nRWTransactions.getAndIncrement();
            RWSnapshot snap = _store.beginRW();
            snap.readWriteBlock(_files.get(rec.fid));
            snap.commit();
        }
        
        private void runBlock(Auspex.Record rec)
            throws NoSuchDatum, AbortedException
        {
            // I have no idea why there are 0-size reads, but they
            // aren't necessarily preceded by an OPEN
            if (rec.action == READ && rec.size == 0)
                return;

            Snapshot snap;
            if (OPEN_CLOSE_TRANSACTIONS) {
                snap = _transactions.get(rec.fid);
                if (snap == null) {
                    info.println("Warning: Block request for unopened file " +
                                 Long.toHexString(rec.fid));
                    return;
                }
            } else {
                if (rec.action == READ && !DONT_OPTIMIZE) {
                    _nROTransactions.getAndIncrement();
                    snap = _store.beginRO();
                } else {
                    _nRWTransactions.getAndIncrement();
                    snap = _store.beginRW();
                }
            }

            int firstBlock = rec.offset / DATA_BLOCK_SIZE;
            int nBlocks = ((rec.offset % DATA_BLOCK_SIZE) +
                           rec.size + DATA_BLOCK_SIZE - 1) / DATA_BLOCK_SIZE;

            // Make sure the file is large enough
            if (rec.action != READ)
                extendFile((RWSnapshot)snap, rec.fid, firstBlock+nBlocks);

            // Get the metadata block
            ByteBuffer inode = snap.readBlock(_files.get(rec.fid));
            int existing = inode.getInt(0);

            for (int block = firstBlock; block < firstBlock + nBlocks; ++block) {
                if (rec.action == READ && block >= existing) {
                    info.println("Warning: Read beyond EOF");
                    break;
                }
                // Read the inode to find the data block
                ByteBuffer buf = inode;
                int[] r = route(block);
                for (int i = 0; i < r.length-1; ++i) {
                    buf = snap.readBlock(buf.getLong(r[i]));
                }
                long datablockid = buf.getLong(r[r.length-1]);
                // Retrieve the data block
                if (rec.action == READ) {
                    snap.readBlock(datablockid);
                } else {
                    ((RWSnapshot)snap).readWriteBlock(datablockid);
                }
            }

            if (!OPEN_CLOSE_TRANSACTIONS) {
                snap.commit();
            }            
        }
    }
                
    private class TraceServer extends Thread
    {
        //make server and stuff
        private final Server<BlockData> _s;
        
        public TraceServer(int port)
            throws IOException
        {
            super(new ThreadGroup("Server"), "Main");
            setDaemon(true);

            _s = new Server<BlockData>("/tmp/anastore-cache/server",
                                       SERVER_PORT,
                                       BlockData.FACTORY);            
        }
        
        public void run()
        {
            //run the server
            try {
                _s.start();
            } catch (IOException e) {
                throw new Bug("Could not start the server", e);
            }
        }
        
        public void endTrace()
        {
            //stop the server
            //s.stop();
        }
    }

    public void run() 
    {
        Thread.currentThread().setName("Trace dispatcher");

        long serverStartTime = getThreadGroupCpuTime(_server.getThreadGroup());

        Auspex trace;
        try {
            trace = new Auspex(_traceFile);
        } catch (IOException e) {
            throw new Bug("Could not open Auspex trace", e);
        }

        PrintStream statsFile;
        try {
            statsFile = new PrintStream(_statsName);
        } catch (java.io.FileNotFoundException e) {
            throw new Bug(e);
        }

        statsFile.println("# Num hosts: " + _clients.size());
        statsFile.println("# Time (sec)  Sent  Recv  Total (MB)  Aborts  RO xactions  RW xactions");

        Auspex.Record rec;
        //make records for the record pool
        for(int i = 0; i < 2*_clients.size(); i++) {
            rec = new Auspex.Record();
            try {
                _recordPool.put(rec);
            } catch (InterruptedException e) {
                throw new Bug(e);
            }
        }

        //start all of the clients
        for(TraceClient client : _clients.values()) {
            client.start();
        }

        //start running the trace
        double firstTS = -1;
        for(int i = 0; i < NUM_TRACES; i++) {
            rec = _recordPool.poll();
            if (rec == null)
                rec = new Auspex.Record();

            try {
                trace.next(rec);
            } catch (NoSuchElementException e) {
                break;
            } catch (IOException e) {
                throw new Bug("Could not get another record", e);
            }

            if (firstTS == -1)
                firstTS = rec.timestamp;
            if ((i+1) % 100 == 0) {
                info.println("Statistics " + _stats);
                double sent = ((double)_stats.getBytesSent())/(1024*1024);
                double recv = ((double)_stats.getBytesReceived())/(1024*1024);
                statsFile.format("%f %f %f %f %d %d %d\n",
                                 rec.timestamp - firstTS,
                                 sent, recv, sent + recv,
                                 _nAborts.get(),
                                 _nROTransactions.get(),
                                 _nRWTransactions.get());
                statsFile.flush();
            }
            if ((i+1) % 1000 == 0) {
                long clientTime = getThreadGroupCpuTime(_clientThreads);
                long serverNowTime = getThreadGroupCpuTime(_server.getThreadGroup());
                long serverTime = serverNowTime - serverStartTime;

                double clientSecs = (double)(clientTime/1000000)/1000;
                statsFile.println("# Total client CPU time: " +
                                  clientSecs);
                statsFile.println("# Average client CPU time: " +
                                  (clientSecs/_clients.size()));
                statsFile.println("# Server CPU time: " +
                                  (double)(serverTime/1000000)/1000);
            }

            if (blacklisted(i, rec)) {
                info.println("SKIPPING " + rec);
                continue;
            }

            TraceClient client = _clients.get(rec.host);
            try {
                if (!client.isAlive()) {
                    info.println("Client " + client + " died");
                    //break;
                }
                if (!client._q.offer(rec, 10, TimeUnit.SECONDS)) {
                    info.println("Client " + client + " appears wedged");
                    //break;
                }
            } catch (InterruptedException e) {
                throw new Bug("Master thread interrupted!", e);
            }
        }

        info.println("Killing clients");
        
        //kill clients
        rec = new Auspex.Record();
        rec.type = Auspex.Type.EOT;
        
        for(TraceClient client : _clients.values()) {
            try {
                if (client.isAlive()) {
                    client._q.offer(rec, 5, TimeUnit.SECONDS);
                    client.join();
                } else {
                    info.println("Thread " + client + " is wedged");
                    client.setDaemon(true);
                }
            } catch (InterruptedException e) {
                throw new Bug("Master thread interrupted while trying to kill clients!", e);
            }
        }

        // Kill server
        _server.endTrace();

        // Print statistics
        info.println("OPEN_CLOSE_TRANSACTIONS: " + OPEN_CLOSE_TRANSACTIONS);
        info.println("RETRY_ABORTED: " + RETRY_ABORTED);
        info.println("DONT_OPTIMIZE: " + DONT_OPTIMIZE);
        info.println("FRESHNESS: " + FRESHNESS);
        info.println(_stats);

        statsFile.close();
    }

    private static boolean blacklisted(int i, Auspex.Record rec)
    {
        if (i+1 == 463) {
            // This record opens a file that isn't read from until
            // record 34574.  This completely screws with expiration.
            assert rec.fid == 0x321e0000000540a6l && rec.host == 2120;
            return true;
        } else if (i+1 == 278986) {
            // Ugh!  There was apparently a bug in the floating point
            // printer.  This record has a malformed timestamp.
            assert rec.fid == 0x271e00000000c3bbl && rec.host == 4131;
            return true;
        } else if (i+1 >= 649004 && i+1 <= 649019) {
            // There's a hole chunk of records that gets repeated here
            return true;
        }
        return false;
    }

    private static void checkTrace(File tf, int numRecords)
        throws IOException
    {
        Auspex trace = new Auspex(tf);
        Auspex.Record rec = new Auspex.Record();
        double lasttime = 0;

        for (int i = 0; i < numRecords; ++i) {
            try {
                trace.next(rec);
            } catch (NoSuchElementException e) {
                throw new IllegalArgumentException
                    ("Trace contains only " + i + " records");
            }

            if (rec.timestamp < lasttime) {
                info.println("Time anomaly at record " + (i+1));
                if (!blacklisted(i, rec))
                    throw new IllegalArgumentException("Unknown anomaly");
            } else
                lasttime = rec.timestamp;
        }
    }

    public static void main(String[] args) 
    {
        //testRoute();
        //if (true) return;

        // final Thread.UncaughtExceptionHandler ueh =
        //     Thread.getDefaultUncaughtExceptionHandler();
        // Thread.setDefaultUncaughtExceptionHandler
        //     (new Thread.UncaughtExceptionHandler() {
        //             public void uncaughtException(Thread t,
        //                                           Throwable e)
        //             {
        //                 ueh.uncaughtException(t, e);
        //                 // Avoid infinite loops
        //                 if (e instanceof Exception)
        //                     System.exit(1);
        //             }
        //         });

        File traceFile = new File(args[0]);
        Pattern argParser = Pattern.compile
            ("([0-9]+)-(oc|po)-(retry|noretry)-(rw|ro-([0-9]+.?[0-9]*))");
        Matcher parsed = argParser.matcher(args[1]);
        if (!parsed.matches())
            throw new IllegalArgumentException(args[1]);
        int numTraces = Integer.parseInt(parsed.group(1));
        boolean openClose = parsed.group(2).equals("oc");
        boolean retryAborted = parsed.group(3).equals("retry");
        boolean dontOptimize = parsed.group(4).equals("rw");
        double freshness = 0;
        if (!dontOptimize)
            freshness = Double.parseDouble(parsed.group(5));
        String statsName;
        if (args.length > 2)
            statsName = args[2];
        else {
            StringBuilder statsNameB =
                new StringBuilder("stats-" + numTraces + "-");
            if (openClose)
                statsNameB.append("oc-");
            else
                statsNameB.append("po-");
            if (retryAborted)
                statsNameB.append("retry-");
            else
                statsNameB.append("noretry-");
            if (dontOptimize)
                statsNameB.append("rw");
            else {
                statsNameB.append("ro-");
                statsNameB.append(freshness);
            }
            statsName = statsNameB.toString();
        }

        try {
            checkTrace(traceFile, numTraces);
        } catch (IOException e) {
            throw new Bug("Failed to check trace", e);
        }

        TraceRunner runner;       
        
        try {
            runner = new TraceRunner(traceFile,
                                     numTraces, openClose, retryAborted,
                                     dontOptimize, freshness, statsName);
        } catch (IOException e) {
            throw new Bug("Could not start trace runner", e);
        }        

        boolean success = false;
        try {
            runner.run();
            success = true;
        } finally {
            if (success)
                info.println("TRACE DISPATCHER SUCCEEDED");
            else
                info.println("TRACE DISPATCHER FAILED");
        }
    }
}
