package anastore.proto;

import anastore.net.SyncMessage;

import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * A request to perform a commit.  This should receive zero or one
 * replies.  In the case of an abort due to a read or write conflict,
 * there should be no explicit reply, because a deprecation message
 * should have been sent out to abort the transaction.  In the case of
 * an abort due to a new block conflict, there should be a
 * ConflictRep.  In the case of a commit, there should be a CommitRep.
 */
public class CommitReq extends SyncMessage
{
    private static final boolean CHECK_SETS = true;

    public transient Map<Long, Long> r;
    public transient Map<Long, byte[]> w;
    public transient Set<Long> n;

    /**
     * Construct a commit request from read, write, and new sets.  All
     * of these sets <i>must not</i> be modified after they are passed
     * to this constructor.  Likewise, the data arrays of written
     * blocks <i>must not</i> be modified.  The w set must be a subset
     * of the union of r and n and n must be a subset of w.
     */
    public CommitReq(Map<Long, Long> r, Map<Long, byte[]> w, Set<Long> n)
    {
        this.r = Collections.unmodifiableMap(r);
        this.w = Collections.unmodifiableMap(w);
        this.n = Collections.unmodifiableSet(n);

        if (CHECK_SETS)
            checkSets();
    }

    /**
     * Assert that the r, w, and n sets obey the properties required
     * by the commit request.
     */
    private void checkSets()
        throws IllegalArgumentException
    {
        if (!w.keySet().containsAll(n))
            throw new IllegalArgumentException
                ("Write set is not a superset of the new set");

        Set<Long> rUnionN;
        if (n.isEmpty()) {
            rUnionN = r.keySet();
        } else {
            rUnionN = new HashSet<Long>(r.keySet());
            rUnionN.addAll(n);
        }
        if (!rUnionN.containsAll(w.keySet()))
            throw new IllegalArgumentException
                ("Write set is not a subset of the union of the read and new sets");
    }

    @Override
    public String toString()
    {
        StringBuilder res = new StringBuilder("CommitReq<r=[");
        Util.summarize(res, r);
        res.append("],w=[");
        Util.summarize(res, w.keySet());
        res.append("],n=[");
        Util.summarize(res, n);
        res.append("]>");
        return res.toString();
    }

    /**
     * Write the commit request to the given output stream, using a
     * compact representation.
     */
    private void writeObject(ObjectOutputStream s)
        throws IOException 
    {
        s.defaultWriteObject();

        // Because every element of w must be in either r or n, and n
        // must be a subset of w, we can avoid explicitly writing out
        // n and the intersection of r and w by writing, along with
        // each element of w, the associated timestamp from r, or -1
        // if it was in n.
        Map<Long, Long> rMinusW = new HashMap<Long, Long>(r);
        s.writeInt(w.size());
        for (Map.Entry<Long, byte[]> entry : w.entrySet()) {
            Long id = entry.getKey();
            s.writeLong(id);
            if (n.contains(id)) {
                s.writeLong(-1);
            } else {
                s.writeLong(r.get(id));
                rMinusW.remove(id);
            }
            byte[] data = entry.getValue();
            s.writeInt(data.length);
            s.write(data);
        }

        Util.writeMapLongLong(s, rMinusW);
    }

    /**
     * Reconstitute a commit request.
     */
    private void readObject(ObjectInputStream s)
        throws IOException, ClassNotFoundException
    {
        s.defaultReadObject();

        Set<Long> n = new HashSet<Long>();
        Map<Long, Long> rIntersectW = new HashMap<Long, Long>();
        int wSize = s.readInt();
        Map<Long, byte[]> w = new HashMap<Long, byte[]>(wSize);
        for (int i = 0; i < wSize; ++i) {
            Long id = s.readLong();
            long ts = s.readLong();
            if (ts == -1) {
                n.add(id);
            } else {
                rIntersectW.put(id, ts);
            }
            int dataSize = s.readInt();
            byte[] data = new byte[dataSize];
            s.readFully(data);
            w.put(id, data);
        }

        Map<Long, Long> rMinusW = Util.readMapLongLong(s);

        for (Map.Entry<Long, Long> entry : rIntersectW.entrySet()) {
            rMinusW.put(entry.getKey(), entry.getValue());
        }

        this.r = Collections.unmodifiableMap(rMinusW);
        this.w = Collections.unmodifiableMap(w);
        this.n = Collections.unmodifiableSet(n);
    }
}
