package anastore.server;

import anastore.net.SendChannel;
import anastore.proto.*;
import anastore.store.DeprecationHandler;
import anastore.store.NoSuchDatum;
import anastore.store.Storable;
import anastore.store.Version;
import anastore.store.VersionStore;
import anastore.util.Bug;
import anastore.util.Log;
import anastore.util.IllegalTimestampException;
import anastore.util.NotImplementedError;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

/**
 * Server's view of one of the clients it is serving. This class
 * implements the protocol, parsing incoming messages and formatting
 * outgoing messages, as well as doing the lookups for
 * GetBlock/GetLatest requests.
 *
 * Because I would like to save myself the trouble of debugging tricky
 * race conditions, currently every non-trivial operation is
 * synchronized on a lock associated with the Server object,
 * i.e. a global lock that serves to essentially eliminate all
 * concurrency. Obviously this is undesirable. It should be possible
 * to replace this with finer granularity locks.
 */
class Client<D extends Storable>
    implements DeprecationHandler {

    private static final Log log = new Log();
    private static final boolean LOG = true;
    
    /**
     * Remote hostname
     */
    private String hostname;

    /**
     * Remote port
     */
    private int port;

    /**
     * Server that this client is served from.
     */
    private Server<D> server;

    /**
     * Versioned store that this client is served from.
     */
    private VersionStore<D> store;

    /**
     * Timestamp of last write or deprecation message
     */
    private long lastTimestamp;

    /**
     * Channel via which to send deprecations.
     */
    private SendChannel deprecationChannel;
    
    /**
     * @return the client's remote hostname
     */
    public final String getHostname() {
        return hostname;
    }

    /**
     * @return the client's remote port number
     */
    public final int getPort() {
        return port;
    }

    /**
     * Creates a new <code>Client</code> instance.
     *
     */
    public Client(String hostname, int port,
                  Server<D> server) {
        this.hostname = hostname;
        this.port = port;
        this.server = server;
        this.store = server.getStore();
    }

    /**
     * Get the <code>LastTimestamp</code> value.
     *
     * @return a <code>long</code> value
     */
    public final long getLastTimestamp() {
        return lastTimestamp;
    }

    /**
     * Handle a HandshakeReq. For each of the unbounded entries
     * currently in the client's cache, look up whether they're fresh,
     * and, if they've been superseded, send out DeprecationReps. When
     * done, send a HandshakeRep.
     */
    public void handleHandshakeReq(HandshakeReq req,
                                   SendChannel replyChannel)
        throws IOException {

        // The reply channel for this message is the one to use for
        // sending all future deprecations.
        deprecationChannel = replyChannel;

        // Send deprecation replies to the client for any blocks that
        // are no longer current.
        synchronized (server) {
            Map<Long, Set<Long>> boundedBlocks =
                new TreeMap<Long, Set<Long>>();

            // Build a sorted list of out-of-date blocks so that the
            // deprecations can be sent in increasing timestamp
            // order.
            for (Map.Entry<Long, Long> ent : req.lowerBounds.entrySet()) {
                final long id = ent.getKey();
                final long ts = ent.getValue();

                Version<D> v = store.get(id, ts, this);
                if (v.getRange().hasUpperBound()) {
                    long ub = v.getRange().getUpperBound();
                    if (!boundedBlocks.containsKey(ub)) {
                        boundedBlocks.put(ub, new HashSet<Long>());
                    }
                    boundedBlocks.get(ub).add(id);
                }
            }

            for (long ts : boundedBlocks.keySet()) {
                sendDeprecation(ts, boundedBlocks.get(ts));
            }

            // Send handshake reply
            deprecationChannel.send(new HandshakeRep());            
        }
    }

    /**
     * Handle a GetBlockReq. Send back the requested version of the
     * requested block, assuming it exists. If the returned block is
     * unbounded, a deprecation handler will be installed to send a
     * DeprecationRep when it is superseded.
     */
    public void handleGetBlockReq(GetBlockReq req,
                                   SendChannel replyChannel)
        throws IOException {

        synchronized (server) {
            try {
                Version<D> v = store.get(req.id, req.timestamp, this);

                BlockRep rep = new BlockRep(v.getRange(),
                                            v.get().getData());
                replyChannel.send(rep);
            } catch (NoSuchDatum e) {
                replyChannel.send(new NoSuchDatumRep());
            } catch (IllegalStateException e) {
                // XXX Send error message
                throw new Bug(e);
            }            
        }
    }

    /**
     * Handle a GetBlockReq. Send back the current version of the
     * requested block, assuming it exists. A DeprecationRep will
     * be sent when it is superseded.
     */
    public void handleGetLatestReq(GetLatestReq req,
                                    SendChannel replyChannel)
        throws IOException {

        synchronized (server) {
            try {
                Version<D> v = store.getLatest(req.id, this);

                BlockRep rep = new BlockRep(v.getRange(),
                                            v.get().getData());
                replyChannel.send(rep);
            } catch (NoSuchDatum e) {
                replyChannel.send(new NoSuchDatumRep());
            } catch (IllegalStateException e) {
                // XXX Send error message            
                throw new Bug(e);
            }            
        }
    }

    /**
     * Handle a CommitReq message, committing a transaction. If the
     * transaction does not validate (optimistic concurrency failed),
     * a ConflictRep will be sent in reply. Otherwise, the transaction
     * will be committed, assigned the next available timestamp, and a
     * CommitRep will be sent.
     */
    public void handleCommitReq(CommitReq req,
                                SendChannel replyChannel)
        throws IOException {

        synchronized (server) {
            try {
                long ts = server.validateAndCommit(this,
                                                   req.r, req.w, req.n);
                if (LOG) log.println("Will commit transaction for " +
                                     hostname + ":" + port +
                                     " at time " + ts);
                if (lastTimestamp <= ts-1) {
                    sendDeprecation(ts-1, new HashSet<Long>());
                }
                CommitRep rep = new CommitRep(ts);
                if (LOG) log.println("Sending commit reply to  " +
                                     hostname + ":" + port +
                                     "  " + rep);
                lastTimestamp = ts+1;
                replyChannel.send(rep);
            } catch (CommitFailedException e) {
                if (LOG)
                    log.println("Commit failed: " + e.getMessage());
                replyChannel.send(new ConflictRep());
            } catch (NoSuchDatum e) {
                // XXX Send error message
                throw new Bug(e);
            } catch (IllegalArgumentException e) {
                // XXX Send error message
                throw new Bug(e);
            }
        }
    }

    /**
     * Handle a HiReq message, returning an empty invalidation with
     * the latest GLUB.
     */
    public void handleHiReq(HiReq req,
                            SendChannel replyChannel)
        throws IOException
    {
        synchronized (server) {
            sendDeprecation(store.getLeastUpperBound()-1,
                            new HashSet<Long>());
        }
    }
    
    /**
     * Send a deprecation message on the deprecation channel for the
     * specified set of blocks.  As a special case, this can also send
     * a "null" deprecation, which is an empty deprecation that
     * doesn't change the GLUB, but it used only for updating the PUB
     * on the client.
     */
    private void sendDeprecation(long ts, Set<Long> blocks)
        throws IOException {

        synchronized (server) {
            assert (ts >= lastTimestamp ||
                    (blocks.isEmpty() && ts == lastTimestamp - 1));
            DeprecationRep rep =
                new DeprecationRep(lastTimestamp, ts, blocks);
            if (LOG) log.println("Sending deprecation to " +
                                 hostname + ":" + port + "  " +
                                 rep);
            deprecationChannel.send(rep);
            lastTimestamp = ts + 1;            
        }
    }

    public void onDeprecation(Set<Long> deps, long upperBound) {
        try {
            sendDeprecation(upperBound, deps);
        } catch (IOException e) {
            // XXX Need to handle this and shut down the client
            // connection or something.
            throw new Bug(e);
        }
    }
}
