package anastore.net;

import java.util.*;
import java.io.*;
import java.net.*;
import anastore.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Implementation of the channel system protocol. This thread services
 * one connection and all of the channels that are associated with it.
 *<p>
 * Receive channels are implemented as blocking queues; this thread
 * receives incoming messages, and dispatches them to the appropriate
 * queue. Send channels forward their requests to this object, which
 * serializes them, assigns them an ID, and sends them over the wire.
 *<p>
 * The implementation does not treat synchronous and asynchronous
 * messages differently; asynchronous messages are just messages that
 * happen to drop their response channel on the floor.
 */
class ChannelProtocolThread extends Thread {
    /**
     * Channel ID of the root channel, i.e. the one that is implicitly
     * created upon connection setup.
     */
    private static final int ROOT_CHANNEL_ID = 0;

    private static final Log errlog = new Log();
    private static final boolean MSGLOG = false;
    private static final Log msglog = new Log(MSGLOG);

    /**
     * The socket associated with this connection.
     */
    private Socket socket;

    /**
     * Hostname of the other side of this connection, for debugging
     * purposes only.
     */
    private String hostname;
    private int port;

    /**
     * Serializers
     */
    private ObjectInputStream inStream;
    private ObjectOutputStream outStream;

    private final StreamStatistics stats;

    /**
     * Map from channel ID to associated receive channel. This is used
     * to dispatch incoming messages. It may be implemented as a
     * WeakValueMap, meaning that dropping references to a
     * ReceiveChannel will cause it to be removed.
     */
    private Map<Integer, QueueReceiveChannel> channelMap;

    /**
     * ID of the last message that was sent, i.e. the last receive
     * channel ID used.
     */
    private int lastChannelId = ROOT_CHANNEL_ID;

    /**
     * Saved IOException that occurred while receiving messages.
     */
    private IOException ioException = null;
    
    private ProxySendChannel rootSendChannel;
    private QueueReceiveChannel rootReceiveChannel;

    /**
     * Lock for ensuring that send requests, which are executed by
     * another thread, do not conflict.
     */
    private Object sendLock = new Object();
    

    /**
     * Blocking queue implementation of ReceiveChannel
     */
    private class QueueReceiveChannel implements ReceiveChannel {
        /**
         * Gratuitous wrapper class since it looks like one can't
         * insert null elements into a LinkedBlockingQueue.
         */
        private class QueueElement {
            public final Pair<Message, SendChannel> element;
            public QueueElement(Pair<Message, SendChannel> e) {
                element = e;
            }
        }
        
        private BlockingQueue<QueueElement> queue;
        private int id;

        public QueueReceiveChannel(int id) {
            this.id = id;
            queue = new LinkedBlockingQueue<QueueElement>();
        }

        public Pair<Message, SendChannel> receive()
            throws ChannelClosedException, InterruptedException {
            Pair<Message, SendChannel> r = queue.take().element;

            if (r != null) {
                return r;
            } else {
                throw new ChannelClosedException(ioException);
            }
        }

        public AsyncMessage receiveAsync()
            throws ChannelClosedException, InterruptedException,
                   IllegalStateException
        {
            Pair<Message, SendChannel> p = receive();
            if (p.first instanceof AsyncMessage)
                return (AsyncMessage)p.first;
            else
                throw new IllegalStateException("Received synchronous message");
        }

        void send(Message m, SendChannel c) {
            boolean success =
                queue.offer(new QueueElement(new Pair<Message, SendChannel>(m,c)));
            
            if (!success) {
                throw new Bug("Capacity-less queue exceeded capacity");
            }
        }

        void notifyError() {
            boolean success =
                queue.offer(new QueueElement(null));
            
            if (!success) {
                throw new Bug("Capacity-less queue exceeded capacity");
            }            
        }
    }

    /**
     * Implementation of a SendChannel that forwards its requests to
     * ChannelProtocolThread.protocolSend()
     */
    private class ProxySendChannel implements SendChannel {
        private int id;

        public ProxySendChannel(int id) {
            this.id = id;
        }

        public void send(AsyncMessage m)
            throws IOException {
            protocolSend(id, m);
        }

        public ReceiveChannel send(SyncMessage m)
            throws IOException {
            return protocolSend(id, m);
        }
    }
    
    /**
     * Creates a new <code>ChannelProtocolThread</code> instance.
     * It will not be possible to receive messages until the
     * thread is started.
     *
     * @param socket the socket for this connection
     * @param hostname hostname of the remote hot
     * @param port remote port number
     * @param stats the statistics gathered for this connection or
     * null
     */
    public ChannelProtocolThread(Socket socket,
                                 String hostname, int port,
                                 StreamStatistics stats) {
        super("ChannelProtocolThread: " +
              hostname + ":" + Integer.toString(port));
        setDaemon(true);

        this.socket = socket;
        this.hostname = hostname;
        this.port = port;
        this.stats = stats;

        channelMap = new WeakValueMap<Integer, QueueReceiveChannel>();
        rootReceiveChannel = new QueueReceiveChannel(ROOT_CHANNEL_ID);
        rootSendChannel = new ProxySendChannel(ROOT_CHANNEL_ID);
        channelMap.put(ROOT_CHANNEL_ID, rootReceiveChannel);

        /*
         * Set up the output stream. We do this now to eliminate a
         * race condition where another thread tries to start sending
         * messages while we are starting up.
         */
        try {
            OutputStream sos = socket.getOutputStream();
            if (stats != null) {
                sos = new CountingOutputStream(sos, stats);
            }
            outStream = new FastObjectOutputStream
                    (new BufferedOutputStream(sos));
        } catch (IOException e) {
            throw new RuntimeException("IOException while creating I/O streams" +
                                       e.toString());
        }

    }

    public void run() {
        
        // Create input stream
        try {
            InputStream sis = socket.getInputStream();
            if (stats != null) {
                sis = new CountingInputStream(sis, stats);
            }
            inStream = new FastObjectInputStream
                (new BufferedInputStream(sis));
        } catch (IOException e) {
            throw new RuntimeException("IOException while creating I/O streams" +
                                       e.toString());
        }
        
        // Loop forever, reading messages and dispatching them.
        try {      
            while (true) {
                try {
                    // Read a message
                    Object o = inStream.readObject();
                    if (!(o instanceof ChannelProtocolMessage)) {
                        errlog.println("Received object that was not a " +
                                       "ChannelProtocolMessage: " +
                                       o.toString());
                        continue;
                    }
            
                    ChannelProtocolMessage msg = (ChannelProtocolMessage) o;
//                     System.out.println("Received message for channel " +
//                                        msg.destChannelId +
//                                        "; reply channel " + msg.replyChannelId);
                    if (MSGLOG)
                        msglog.println("Received " + msg.msg);

                    // Find the channel to send it to
                    QueueReceiveChannel chan = channelMap.get(msg.destChannelId);

                    if (chan == null) {
                        errlog.println("Received message for non-existent " +
                                       "channel " + msg.destChannelId + ": " +
                                       msg.msg.toString());
                        continue;
                    }

                    chan.send(msg.msg,
                              new ProxySendChannel(msg.replyChannelId));
                } catch (ClassNotFoundException e) {
                    errlog.println(e.toString());
                }
            }
        } catch (IOException e) {
            // IOException while receiving. Store the exception,
            // notify the receive channels, and end this thread.
            ioException = e;

            for (QueueReceiveChannel chan : channelMap.values()) {
                chan.notifyError();
            }
        }
    }

    /**
     * Returns the root SendChannel: the one that is automatically
     * created on connection setup.
     */
    public SendChannel getRootSendChannel() {
        return rootSendChannel;
    }

    /**
     * Returns the root ReceiveChannel: the one that is automatically
     * created on connection setup.
     */
    public ReceiveChannel getRootReceiveChannel() {
        return rootReceiveChannel;
    }

    /**
     * Used only by ProxySendChannel to actually send a message.
     */
    private ReceiveChannel protocolSend(int destChannelId, Message m)
        throws IOException {

        /*
         * If an IOException caused the receive thread to exit, it
         * will be saved in ioException and we should rethrow it.
         * There may be a race condition here, but it doesn't really
         * matter since in that case trying to send will cause an
         * IOException of its own.
         */
        if (ioException != null) {
            throw new ChannelClosedException(ioException);
        }

        /*
         * Acquire lock to prevent two different threads from trying
         * to send at the same time (since this is executed by a
         * thread other than the ChannelProtocolThread).
         */
        synchronized (sendLock) {
            
            // Assign an ID and build a protocol message
            int replyChannelId = nextId();
            ChannelProtocolMessage msg =
                new ChannelProtocolMessage(destChannelId,
                                           replyChannelId, m);

            // Create a reply channel and add it to the dispatch table
            QueueReceiveChannel replyChannel =
                new QueueReceiveChannel(replyChannelId);
            channelMap.put(replyChannelId, replyChannel);

            // Send
            if (MSGLOG)
                msglog.println("Sending " + m);
            outStream.writeObject(msg);
            // XXX This deals with class descriptors really, really,
            // really inefficiently, but we need to do this or
            // everything we send will wind up in the receiver's
            // handle able and NEVER GO AWAY!
            outStream.reset();
            outStream.flush();
            return replyChannel;
        }
    }

    /**
     * Get the next available channel ID for an outgoing message's
     * reply channel.
     */
    private int nextId() {
        lastChannelId += 1;
        return lastChannelId;
    }
}
