package anastore.server;

import anastore.net.ChannelServer;
import anastore.net.ChannelServerListener;
import anastore.net.ReceiveChannel;
import anastore.net.SendChannel;
import anastore.store.NoSuchDatum;
import anastore.store.Storable;
import anastore.store.StorableFactory;
import anastore.store.Version;
import anastore.store.VersionStore;
import anastore.util.BlockData;
import anastore.util.IllegalTimestampException;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * Server for the block store. This implements the central parts of
 * the server: it creates the VersionStore, instantiates the
 * ChannelServer to receive new connections, creates Client objects
 * and ClientHandlerThreads on new connections to handle the protocol.
 *
 * The server also handles validation and commit of transactions,
 * implementing the serial algorithm.
 */
public class Server<D extends Storable> {

    private int port;
    private ChannelServer channelServer;
    private String pathname;
    private VersionStore<D> store;
    private StorableFactory<D> dFactory;
    
    /**
     * Creates a new <code>Server</code> instance.
     */
    public Server(String pathname, int port,
                  StorableFactory<D> dFactory) throws IOException {
        this.port = port;
        this.pathname = pathname;
        this.dFactory = dFactory;
        store = new VersionStore<D>(new File(pathname), dFactory);
    }

    /**
     * @return the version store associated with this server
     */
    public VersionStore<D> getStore() {
        return store;
    }

    /**
     * Given a transaction and its read, write, and new sets,
     * determine whether the transaction can commit, and if so, commit
     * it.
     *
     * @return the timestamp of the transaction, if validation
     * succeeded and the transaction committed.
     * @throws CommitFailedException if the transaction could not be
     * committed because of a concurrent access
     * @throws NoSuchDatum if the request was invalid
     * @throws IllegalTimestampException if the request was invalid
     * @throws IOException if an error occurred in the versioned store
     * while committing the transaction or checking the r/w/n sets.
     */
    public synchronized long validateAndCommit(Client client,
                                               Map<Long, Long> r,
                                               Map<Long, byte[]> w,
                                               Set<Long> n)
        throws CommitFailedException,
               NoSuchDatum, IllegalTimestampException,
               IOException {
            
        long transactionTS;

        // Check whether read set is up to date
        for (Map.Entry<Long, Long> ent : r.entrySet()) {
            long id = ent.getKey();
            long ts = ent.getValue();

            Version<D> v = store.getLatest(id);
            if (v.getRange().getLowerBound() != ts) {
                throw new CommitFailedException("Block id " + id +
                                                " superseded at " +
                                                v.getRange().getLowerBound());
            }
        }

        // Check whether any new blocks already exist
        for (long id : n) {
            try {
                Version<D> v = store.getLatest(id);
                throw new CommitFailedException("New id " + id +
                                                " already exists");
            } catch (NoSuchDatum e) {
                // OK
            }
        }

        // Unserialize write data
        // (Might as well do this as late as possible)
        Map<Long, D> writeItems = new HashMap<Long, D>();
        for (Map.Entry<Long, byte[]> ent : w.entrySet()) {
            writeItems.put(ent.getKey(),
                           dFactory.fromData(ent.getValue()));
        }
        
        // Assign the transaction the next timestamp
        transactionTS = store.getLeastUpperBound();
        
        // Perform the write
        store.write(transactionTS, writeItems, client, client);

        return transactionTS;
    }

    /**
     * Start listening.
     */
    public void start() throws IOException {
        channelServer = new ChannelServer(port);
        channelServer.listen(new ChannelServerListener() {
                public void connectionEstablished(final String hostname,
                                                  final int port,
                                                  final SendChannel send,
                                                  final ReceiveChannel recv) {
                    Client<D> client = new Client<D>(hostname, port,
                                                     Server.this);
                    new ClientHandlerThread(client, send, recv).start();
                }});
    }

    public static final void main(final String[] args) {
        try {
            Server<BlockData> s =
                new Server<BlockData>(args[0],
                                      Integer.parseInt(args[1]),
                                      BlockData.FACTORY);
            s.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
