\chapter{Storage Layer Support}
\label{cha:storage-layer}

The protocol that \txcache uses for ensuring whole-system consistency
requires some support from the storage layer. Specifically, it
requires that the storage layer provide the following two features:
\begin{enumerate}
\item It must allow the \txcache library to specify which timestamp a
  read-only transaction will run at. If the storage layer is not able
  to provide access to all previous timestamps, it must ensure 
  that application servers are aware of which ones are
  available.
\item It must report the validity interval associated with every
  query result it provides
\end{enumerate}
The first requirement is needed to ensure that data obtained from the
storage layer (in the event of a cache miss) is consistent with data read from the cache. The second
requirement is needed to determine the validity intervals for cached
data.

This chapter discusses how to meet these requirements. We discuss how
to provide them in two systems. First, we present a simple block store
that uses multiversion concurrency
(\autoref{sec:storage-layer:blockstore}); it gives an example of how
to meet these requirements in a simple system designed from the ground
up to support them. Next, we discuss how to provide support for
relational databases (\autoref{sec:storage:db}), where integrating with
existing concurrency control mechanisms and complex query processing
presents more challenges.

An important concern is that these requirements can be met easily by
existing systems. This matters because a cache that operates only with
a radically different backing store is of limited value. Although our
cache support does require some modifications to the database, we
argue that they are not onerous. In particular, we show that existing
multiversion databases -- which include most of the database systems
in common use today -- can easily be modified to support these
requirements.

\section{A Transactional Block Store}
\label{sec:storage-layer:blockstore}

The first storage system we describe is a simple transactional block
store. We present this system mainly as an instructive example of how
one might design a storage system from the ground up to meet our
cache's requirements.
%, rather than a practical system.
We note,
however, that many practical storage systems do provide a similar
interface -- either as an end in itself, as in the case of many
``NoSQL'' key-value
stores~\cite{decandia07:_dynam,redis,escriva11:_hyper,memcachedb} --
or as a primitive for building larger distributed
systems~\cite{lee96:_petal}.

Our block store uses a trivial data model: it maintains a set of
\emph{blocks}, identified by a 64-bit integer ID, which store opaque
data. The only data operations it provides are to \command{get} the
value of a block or replace it with a new value (\command{put}).

The system guarantees serializable isolation for transactions, even
when multiple data blocks are read or modified within a transaction.
As shown in \autoref{fig:storage:block-store-api}, the block store
provides functions to begin, commit, and abort a transaction.  Note
that operations like \command{get} and \command{put} do not need to
explicitly identify which transaction they are a part of. Rather, the
active transaction ID is kept as session state: after a
\command{begin} operation starts a transaction, subsequent operations
sent on the same connection are treated as part of that
transaction. We use this connection-oriented interface because it
matches the interface provided by relational databases, and we wanted
the same \txcache library to be compatible with both.


An important note about the interface provided by this block store is
that it allows a read-only transaction to be started at \emph{any}
timestamp. That is, it has no notion of pinned snapshots.
%, and so it
%works with the simpler protocol in \XXX rather than the more complex
%one in \XXX.
In order to allow transactions to run at any timestamp, the block
store retains old versions of data indefinitely; we do not discuss
garbage-collection of old data versions.
% If the system were extended
% to remove old versions then .




\begin{figure}[tbp]
   \addtolength{\leftmargini}{1.5em}
   \itemindent=-1.5em
   \begin{itemize}
 \item \command{begin-ro}(\cmdarg{timestamp}) :
   Start a read-only transaction running at the specified
   timestamp. If no timestamp is specified, uses the latest timestamp.
 \item \command{begin-rw}() :
   Start a read/write transaction; always uses the latest timestamp.
 \item \command{commit}() $\to$ \cmdarg{timestamp} : End a
   transaction, returning the timestamp at which it ran.
 \item \command{abort}() :
   End a transaction, discarding any changes it made.
   \vspace{0.5em}
 \item \command{put}(\cmdarg{id, data}) :
   Store the specified \cmdarg{data} in the block identified by
   \cmdarg{id}. This replaces the existing data in the block if it
   exists, or creates it if it does not.
 \item \command{get}(\cmdarg{id}) $\to$ \cmdarg{data, validity} :
   Returns the data stored in block \cmdarg{id} and its associated
   validity interval.
  \end{itemize}

  \caption{Block store API}
  \label{fig:storage:block-store-api}
\end{figure}



\subsection{Implementation}

The block store server is built using a versioned storage system
similar in design to the \txcache cache server. Like the cache, it
stores a chain of versions for each block ID. Each version of a block
contains the associated data, and is tagged with an associated
validity interval; the validity interval's end is null if the
version is still current. Unlike the cache, data is stored
persistently on disk, and all versions are present.

%\drkp{describe this as a separate protocol for r/w and r/o xacts?}

\autoref{fig:storage:block-store-implementation} gives a sketch of the
implementation of the block store.\footnote{The implementation
  described in \autoref{fig:storage:block-store-implementation} is
  necessarily simplified. Among other things, it does not discuss
  issues related to on-disk storage, or describe the locking required
  to allow concurrent processing of requests. Further details about
  the block store implementation are available in a separate
  report~\cite{830ports07:_optim_distr_read_only_trans}.} Starting a
transaction assigns a read timestamp (\texttt{read\_ts}) for the
transaction; for a read-only transaction this is the timestamp
specified by the \txcache library, whereas for a read/write
transaction it is always the latest timestamp. All \command{get}
operations in a transaction are performed with respect to this
timestamp; they see only data that is valid at this timestamp.  A
\command{get} operation, therefore, locates and returns the version of
the block (if one exists) that was created before
the transaction's read timestamp, and was not replaced before the
transaction's read timestamp.
In a read/write transaction, creating or updating a block with the
\command{put} command creates a new version with the appropriate data,
but does not yet fill it in its validity interval or install it into
the chain of versions for that block ID.

Our block store ensures serializability using optimistic concurrency
control~\cite{kung81:_optim_method_for_concur_contr}. (This isn't a
fundamental choice; it would be equally reasonable to use strict
two-phase
locking~\cite{eswaran76:_notion_consis_predic_locks_datab_system} to
prevent concurrent updates.) Read-only transactions do not require any
concurrency control checks on commit. Committing a read/write
transaction first performs a validation phase. If any of the blocks
read or written by that transaction have been updated since the
transaction started, the transaction is aborted to prevent violating
serializability. Otherwise, the transaction is permitted to commit and
assigned a timestamp. The server then installs the transaction's
changes: it sets the start of the validity interval on the new version
of all modified blocks to the transaction's timestamp, and links the
version into the chain of previous versions.

\begin{figure}[tbp]
  \scriptsize
\begin{alltt}
\textit{// per-session state}
timestamp read_ts
set<block> readset
set<block> writeset


\textbf{begin-ro}(timestamp) \{
  read_ts = timestamp
\}

\textbf{begin-rw}() \{
  read_ts = latest_ts
\}

\textbf{get}(id) \{
  \textit{// add to readset}
  readset += id

  \textit{// walk through the list of versions}
  block = latest_version[id]
  while block != null \{
    \textit{// does this version's validity interval}
    \textit{// include the timestamp?}
    if (read_ts >= block.validity.start &&
        (read_ts < block.validity.end ||
         block.validity.end == null)) \{

      if (block.validity.end != null) \{
          \textit{// use block's validity}
          return (block.data, block.validity)
      \} else \{
          \textit{// block is still valid, so bound its}
          \textit{// validity interval at the latest time}
          return (block.data,
                  [block.validity.start, latest_ts])
      \}
    \}
    block = block.prev_version
  \}
  \textit{// no acceptable version found}
  return not-found
\}
\end{alltt}
  \caption{Sketch of the block store implementation}
\end{figure}  
\begin{figure}[tbp]
\ContinuedFloat
\scriptsize
\begin{alltt}  
\textbf{put}(id, data) \{
  block = allocate_new_block()
  block.id = id
  block.data = data
  \textit{// will fill in block's validity interval later}
  writeset += block
\}

\textbf{commit}() \{
  \textit{// don't need a validation phase}
  \textit{// for read-only transactions}
  if transaction is read-only
    return read_ts

  \textit{// OCC validation phase}
  for b in (readset + writeset)
     if b modified since transaction start
        abort()

  \textit{// assign timestamp to transaction}
  ts = increment(latest_ts)

  \textit{// install new versions}
  for b in writeset
     b.validity.start = ts
     b.validity.end = null
     b.prev_version = latest_version[b.id]
     b.prev_version.validity.end = ts
     latest_version[b.id] = b

  return ts
\}
\end{alltt}
  \caption{Sketch of the block store implementation (continued)}
  \label{fig:storage:block-store-implementation}
\end{figure}

Note that this design meets both of \txcache's storage layer
requirements. First, it allows the \txcache library to control the
timestamp of read-only transactions (as an argument to
\command{begin-ro}), by using multiversioned storage. Second, it can
report the validity interval associated with each query.  Because the
query model is simple, it is trivial for the system to compute the
validity interval of a \command{get} query: it is simply the validity
interval of the block version accessed. If, however, that block
version is currently valid (\ie the upper bound of its validity
interval is null), the block store returns a validity interval that
ends at the latest timestamp, as shown in
\autoref{fig:storage:block-store-implementation}. This truncated
interval is a conservative one; \autoref{cha:invalidations} describes
how to use invalidations to give a better answer.

A final note about the implementation is that, for read-only
transactions, the \command{begin-ro} and \command{commit} operations
effectively do nothing at all. They could be eliminated in favor of
having clients simply indicate a timestamp as an additional argument
to \command{get} operations. However, we use the interface described
here for compatibility with the standard transaction interface used by
relational databases.

\section{Supporting Relational Databases}
\label{sec:storage:db}

A typical deployment of our system will likely use a relational
database management system (RDBMS) as its storage layer, as this is
the most common storage architecture for web applications. As with the
block store, we must ensure that it satisfies \txcache's two storage
layer requirements: it must allow control over which timestamp is used
to run read-only transactions, and it must report the validity
intervals for each query result.  However, relational databases have a
number of significant differences from the simple block store we
previously examined, which present new challenges for meeting these
requirements. We list four differences below:

\begin{enumerate}
\item Databases typically do not have explicit support for running
  transactions on any state other than the current one, and providing
  this requires integrating with existing concurrency control
  mechanisms
\end{enumerate}
This makes it more challenging to allow the \txcache library control
over which timestamp is used to execute read-only queries, compared to
the block store, as the database was not designed from the ground up
to explicitly provide this functionality.

\begin{enumerate}[resume]
\item Queries do not return a single object. Instead, they access
  subsets of relations, specified declaratively -- for example, all
  auctions whose current price is below \$10.
\item Queries involve significant processing in the database: data
  might be obtained using a variety of index access methods and
  subsequently filtered, transformed, or combined with other records.
\end{enumerate}
These two differences make it more complex to compute the validity
intervals for query results. In the block store, which accessed only a
single object per query, this task was straightforward; in a database,
a single query performs a computation over many different data objects
(rows), some of which might not ultimately contribute to the query
result.

\begin{enumerate}[resume]
\item Finally, databases are large, complex, pre-existing pieces of
  software, unlike the block store, which was designed from scratch
\end{enumerate}
This final difference doesn't present any specific problems but guides
our approach: a practical solution to adding caching support must not
involve redesigning major components of the RDBMS.

Note that although we discuss these requirements in the context of a
relational database, they are not specific to such databases. Some of
the challenges described above also arise in real key-value stores,
which are typically more complex and full-featured than the simple
block store we described previously. For example, many support some
form of scanning or searching
operation~\cite{chang06:_bigtab,escriva11:_hyper}.

Although standard databases do not provide the features we need for
cache support, we show they can be implemented by reusing the same
mechanisms that are used to implement widely-used multiversion
concurrency control techniques like snapshot isolation.  In this
section, we describe how we modified an existing DBMS,
PostgreSQL~\cite{postgresql}, to provide the necessary support. The
modifications required are not extensive: our implementation added or
modified less than 1000 lines of code. Moreover, they are not
\postgres-specific; the approach can be applied to other databases
that use multiversion concurrency.



\subsection{Exposing Multiversion Concurrency}
\label{sec:storage:db:concurrency}

Because our cache allows read-only transactions to run slightly in the
past, the database must be able to perform queries against a past
snapshot of a database. This situation arises when a read-only
transaction is assigned a timestamp in the past and reads some cached
data, and then a later operation in the same transaction results in a
cache miss, requiring the application to query the database. The
database query must return results consistent with the cached values
already seen, so the query must execute at the same timestamp in the
past.

One solution could be to use a \emph{temporal
  database}~\cite{oezsoyoglu95:_tempor_and_real_time_datab}, which
tracks the history of its data and allows ``time travel'' -- running
queries against previous database state. Although such databases
exist, they are not commonly used, because retaining historical data
introduces substantial storage and indexing cost.\footnote{The
  POSTGRES research database, on which \postgres is based, was one
  such
  system~\cite{stonebraker87:_desig_of_postg_storag_system}. However,
  time travel support was removed early in transition to an
  open-source production database, precisely because it was considered
  too expensive.} Therefore, we do not consider this a practical
solution.

Furthermore, these systems are overkill: they are designed to support
complex queries over the entire history of the database. What we
require is much simpler: we only need to run a transaction on a stale
but recent snapshot. Our insight is that these requirements are
essentially identical to those for supporting snapshot
isolation~\cite{berenson95:_critiq_of_ansi_sql_isolat_level}, so many
databases already have the infrastructure to support them. Our
modifications, then, are intended to expose control over these
internal mechanisms to the \txcache library.

\subsubsection{Background: Multiversion Concurrency Control in \postgres}
\label{sec:storage:db:background}

Our techniques for adding support for \txcache's requirements to
existing databases take advantage of their existing
multiversion concurrency control (MVCC) mechanisms. Accordingly, we
begin with a primer on how MVCC techniques are implemented. For
concreteness, we describe the implementation of MVCC in \postgres, the
database we use in our implementation. We emphasize, however, that the
techniques we describe are not \postgres-specific but can be applied
to other databases that use MVCC. Much of what we describe in this
section is true of other MVCC implementations; we indicate some
important differences.

Databases using multiversion concurrency control store one or more
versions of each tuple. Internally to the storage manager, each
transaction has an ID that is used to identify which changes it
made. \postgres tags each tuple version with the ID of the transaction
that created that version (xmin) and the ID of the transaction that
deleted it or replaced it with a new version, if one exists
(xmax). When a transaction modifies a tuple (an \command{update}
statement), it is treated logically as two operations: deleting the
previous tuple version, and inserting the new version.  The tag on
each tuple version is similar to our use of validity intervals, with
one important exception: \postgres's transaction IDs are opaque
identifiers rather than timestamps, \ie their numeric ordering does
not correspond to the commit order of transactions. (This is because
transaction IDs are assigned when transactions \emph{start}, not when
they commit.)

Each query is performed with respect to a \emph{snapshot}, which is
represented as a set of transaction IDs whose changes should be
visible to the query. When a transaction starts, \postgres
\emph{acquires a snapshot} by computing the set of transactions that
have already committed, and uses that snapshot for the transaction's
queries.  During the execution of a transaction, the \postgres storage
manager discards any tuples that fail a \emph{visibility check} for
that transaction's snapshot, \eg those that were deleted by a
transaction that committed before the snapshot, or created by a
transaction not yet committed.

A ``vacuum cleaner'' process periodically scans the database and
removes old tuple versions that are no longer visible to any running
transaction.


% \subsubsection{Commit Ordering}

% One of the assumptions we have made throughout the preceding sections
% is that all transactions are assigned a timestamp that corresponds to
% their commit order, and this timestamp serves as the transaction's
% identifier. This is a useful property, as it reflects the relative
% ordering of transactions and can be used to determine which
% transactions appear in which snapshots. However, \postgres (like many
% other databases) identifies transactions with a transaction ID that
% does not reflect the commit order; this is because the transaction ID
% is needed before the transaction commits.

% We therefore modify \postgres to assign a timestamp to each read/write
% transaction when it commits, and keep a table in memory that maps this
% timestamp to the transaction ID used internally, and vice versa. These
% mappings do not need to be retained indefinitely; as we will see, we
% do not need accurate information about the commit order of
% transactions that committed before the earliest pinned snapshot.


\subsubsection{Pinned Snapshots}
\label{sec:storage:mvcc:pinned-snapshots}

\txcache needs the database to be able to run a read-only transaction
on a recent state of the data. \postgres has the ability to run
transactions with respect to snapshots, so it seems well suited to
provide this feature. However, there are two challenges to providing
access to past snapshots. First, we need to ensure that the old tuple
versions are still present; if they are sufficiently old, the vacuum
cleaner process might have removed them. Second, we need to preserve
the \emph{metadata} required to reconstruct that state of the
database, \ie the set of transactions that need to appear in the
snapshot. \postgres is able to compute the \emph{current} snapshot,
\ie the set of currently committed transactions. But it does not know
the \emph{order} in which those transactions committed, so it does not
know which transactions should be visible in previous snapshots.

We provided an interface that allows the current snapshot to be saved
so that it can be used by subsequent read-only
transactions.\footnote{This feature has other uses besides supporting
  a transactional cache. A ``synchronized snapshots'' feature that
  provides essentially the same functionality was independently
  developed and is scheduled to appear in an upcoming release of
  \postgres. The semantics are similar, though not exactly
  the same, to the ones we describe here.}
We added
a \command{pin} command that acquires and saves the current
snapshot. The \command{pin} command returns an identifier that can be
provided as an argument to a later \command{begin snapshotid} command,
starting another transaction that sees the same view of the database.
The database state for that snapshot remains available at least until
it is released by the \command{unpin} command. A pinned snapshot is
identified by the number of read/write transactions that committed
before it, allowing it to be easily ordered with respect to update
transactions and other snapshots.

One way to think of this interface is that the \command{pin} command
starts a new read-only transaction, but does not associate it with the
active connection (as typically happens when starting a
transaction). Rather, it gives it a name that allows later read-only
queries to be run with the same snapshot, effectively making them part
of the same transaction.

\autoref{fig:storage:pinned-snapshot-example} demonstrates the use of
this API. The example consists of five transactions. Transactions 1
and 2 set up a table that contains four values. After transaction 2,
the current snapshot is pinned; it is identified by the latest visible
transaction's timestamp (2). Transactions 3--5 delete two values from
the table and insert one new one. Finally, we run a read-only
transaction and specify that it should run at timestamp 2. This
transaction sees the state of the table at the time the snapshot was
pinned, \ie it does not see the effects of transactions 3--5.

\begin{figure}[tbp]
  \begin{lstlisting}[language={[example]sql},columns=fullflexible]
-- setup   
test=# CREATE TABLE test (x int);
CREATE TABLE

-- timestamp 2
test=# INSERT INTO test VALUES ('1'), ('2'), ('3'), ('4');                      
INSERT 0 4

-- Pin the current snapshot
-- The snapshot is identified by the timestamp of the latest visible transaction: 2
-- The numbers after '2' are a representation of the database's wall-clock time.
test=# PIN;
PIN 2 1334270999 97317

-- timestamp 3
test=# DELETE FROM test WHERE x=4;                                              
DELETE 1

-- timestamp 4
test=# DELETE FROM test WHERE x=3;                                              
DELETE 1

-- timestamp 5
test=# INSERT INTO test VALUES ('5');                                           
INSERT 0 1

-- This read-only transaction is explicitly specified to run at
-- timestamp 2; therefore, it does not see the changes of transactions 3-5
test=# BEGIN READ ONLY SNAPSHOTID 2;
BEGIN
test=# SELECT * FROM test;
 x 
---
 1
 2
 3
 4
(4 rows)
SELECT VALIDITY 2 3

test=# COMMIT;    
  \end{lstlisting}
  \caption{Demonstration of pinned snapshots in our modified \postgres}
  \label{fig:storage:pinned-snapshot-example}
\end{figure}

In \postgres, we implement pinned snapshots by saving a representation
of the snapshot (the set of transaction IDs visible to it) in a table,
and preventing \postgres's vacuum cleaner process from removing the
tuple versions in the snapshot. \postgres is especially well-suited to
this modification because of its no-overwrite storage
manager~\cite{stonebraker86:_desig_of_postg}. Because stale data is
already removed asynchronously, the fact that we keep data around
slightly longer has little impact on performance. However, our
technique is not \postgres-specific; any database that implements
snapshot isolation must keep a similar history of recent database
states. For example, the Oracle DBMS stores previous versions of
recently-changed data in a separate ``rollback segment''. The
similarity between the requirements for creating a new pinned snapshot
and those for starting a new transaction (which runs on a snapshot of
the database) suggests that this interface should be straightforward
to implement regardless of how the database's storage manager is built.


% The implementation of pinned snapshots takes advantage of \postgres's
% multiversion storage. Like the block store described above,
% \postgres uses a ``no-overwrite'' storage
% manager~\cite{stonebraker87:_desig_of_postg_storag_system} that stores
% multiple versions of each tuple, tagged with the IDs of the
% transaction that created and deleted (if appropriate) that
% version. When a new transaction starts, it determines the list of
% previously committed transactions (represented as a
% \texttt{SnapshotData} object). Thereafter, the \postgres storage
% manager discards any tuples that fail a \emph{visibility check} for
% that transaction's snapshot, \eg those that were deleted by a
% transaction that committed before the snapshot, or created by a
% transaction not yet committed. A ``vacuum cleaner'' process
% periodically scans the database and removes old tuple versions that
% are no longer visible to any running transaction.

% Given this design, implementing pinned snapshots is straightforward:
% the \command{pin} command simply obtains a \texttt{SnapshotData} object
% just as starting a new transaction would, and saves it for future
% use. The \command{begin snapshotid} command restores the corresponding
% saved snapshot. The system also prevents the vacuum cleaner process
% from removing any tuple versions unless they are older than the oldest
% pinned snapshot.


% \subsubsection{Ensuring Serializability}

% \drkp{This section is going to absorb some of the stuff about
%   different implementations of serializability from ch.3. Putting this
%   off for now since i need to sort out what's going to appear there first.}

% \begin{outline}
% \item is just running on the snapshot enough to make sure we have
%   serializability?
% \item in some databases, yes. Specifically, those with commit ordering
%   (S2PL/OCC). There, don't need to take any read locks, do OCC
%   validation, etc. on read-only transactions 
% \item in Postgres: no problem if all we want is snapshot isolation;
%   same on other SI databases
% \item in Postgres, have SSI for serializability. Need to keep the
%   serializable state (siread locks)
% \item reads to the database might cause a r/w transaction to be
%   aborted, which is OK
% \item might also cause a r/o transaction to be aborted, which isn't OK
%   if it's pinned. Need to prevent this.
% \item only case we can't is when the r/o transaction is doing a query,
%   and the other transactions involved in the conflict have already
%   committed
% \item in that case, what we're really doing is saying that a certain
%   query can't be run on a certain snapshot --- because it exposes an
%   inconsistent state. So the application has to retry on a different one
% \item can avoid this if we're using a safe snapshot; anything can be
%   read there. Should prefer those.
% \end{outline}

\subsection{Tracking Query Validity}
\label{sec:storage:db:validity}

\txcache's second requirement for the storage layer is that it must
indicate the validity interval for every query result it returns. The
\txcache library needs this information to ensure transactional
consistency of cached objects. Recall that this interval is defined as
the range of timestamps for which the query would give the same
results. Its lower bound is the commit time of the most recent
transaction that added, deleted, or modified any tuple in the result
set. It may have an upper bound if a subsequent transaction changed
the result, or it may be unbounded if the result is still current. The
validity interval of a query always includes the timestamp associated
with the transaction in which it ran.

As previously noted, each tuple version in \postgres, and in other
multiversion concurrency control databases, is tagged with the ID of
the transaction that created it, and the transaction that deleted it
or replaced it with a newer version (if any). This effectively serves
as a validity interval for that tuple version, although we must
translate from \postgres's unordered transaction IDs to timestamps
that reflect the commit order of transactions. To accomplish this
translation, we modified \postgres to maintain a table in memory that
maps transaction IDs to logical timestamps, for transactions that
committed after the earliest pinned snapshot.

In the block store, each \command{get} query accessed exactly one
block, so the validity interval of the query result is simply the
validity interval of the block. In a relational database, however, a
\command{select} query may access many tuples. We might attempt to
compute the validity interval of a query by taking the intersection of
the validity intervals of all the tuples accessed during query
processing. This approach is overly
conservative: query processing often accesses many tuples that do not
affect the final result (consider the case of a sequential scan, which
reads every tuple in a relation, only to discard the ones that do not
match a predicate).

Instead, we compute the validity interval of the tuples that are
ultimately returned to the application.  We define the \emph{result
  tuple validity} to be the intersection of the validity times of the
tuples returned by the query. To compute it, we propagate the validity
intervals throughout query execution, and take the intersection of the
validity intervals of the returned tuples. If an operator, such as a
join, combines multiple tuples to produce a single result, the
validity interval of the output tuple is the intersection of its
inputs. (Aggregate operators require special handling; we discuss them
separately later.)

\begin{figure}[tp]
  \centering
  \includegraphics[width=0.9\textwidth]{figures/db-intervals.pdf}
  \caption[Example of tracking the validity interval for a
    read-only query]{Example of tracking the validity interval for a
    read-only query.  All four tuples match the query
    predicate. Tuples 1 and 2 match the timestamp, so their intervals
    intersect to form the result validity. Tuples 3 and 4 fail the
    visibility test, so their intervals join to form the invalidity
    mask. The final validity interval is the difference between the
    result validity and the invalidity mask.}
  \label{fig:db-intervals}
\end{figure}

\subsubsection{Dealing with Phantoms}

The result tuple validity, however, does not completely capture the
validity of a query, because of \emph{phantoms}. These are tuples that
did \emph{not} appear in the result, but would have if the query were
run at a different timestamp. For example, in
\autoref{fig:db-intervals}, tuples 1 and 2 are returned in the query
result; the result tuple validity, therefore, is $[44, 47)$, the
intersection of their validity intervals. However, this is not the
correct validity interval for the query result. Timestamp 44 should
not be included in the interval, because the query results would be
different if the query were run at this timestamp.  Tuple 3 does not
appear in the results because it was deleted before the query
timestamp, but would have been visible if the query were run before it
was deleted.  Similarly, tuple 4 is not visible because it was created
afterwards.

We capture this effect with the \emph{invalidity mask}, which is the
union of the validity times for all tuples that failed the
\emph{visibility check}, \ie were discarded because their timestamps
made them invisible to the transaction's snapshot. Throughout query
execution, whenever such a tuple is encountered, its validity interval
is added to the invalidity mask. In \autoref{fig:db-intervals}, that
includes tuples 3 and 4.

The invalidity mask is a conservative measure.  Visibility checks are
performed as early as possible in the query plan to avoid processing
unnecessary tuples. Some of these tuples might ultimately have been
discarded anyway, if they failed the query conditions later in the
query plan (perhaps after joining with another table). In these cases,
including these tuples in the invalidity mask is unnecessary.  While
being conservative preserves the correctness of the cached results, it
might unnecessarily constrain the validity intervals of cached items,
reducing the hit rate. In theory, this problem could be avoided
entirely by delaying the visibility check until just before returning
the query result, after processing is complete; however, this would be
unacceptably expensive because it might involve subjecting tuples to
expensive processing (\eg multiple joins or complex function
processing) only to discard the results in the end.

To ameliorate this problem, we continue to perform the visibility
check as early as possible, but during sequential scans and index
lookups, we evaluate the predicate before the visibility check. This
differs from \postgres's usual behavior with respect to sequential
scans, where it evaluates the cheaper visibility check first. Delaying
the visibility checks improves the quality of the invalidity mask,
particularly for sequential scans which must examine every tuple in a
relation. It does increase the processing cost when executing queries
with complex predicates -- which is why standard \postgres performs
the validity check first -- but the overhead is low for simple
predicates, which are most common. In our experiments
(\autoref{cha:evaluation}), we found that this change did not cause
any perceptible decrease in throughput for a web application.

Finally, the invalidity mask is subtracted from the result tuple
validity to give the query's final validity interval.  This interval
is reported to the \txcache library, piggybacked on each
\command{select} query result; the library combines these intervals to
obtain validity intervals for objects it stores in the cache. An
example of this reporting can be seen at the end of
\autoref{fig:storage:pinned-snapshot-example}, where the database
indicates that the validity interval of the \command{select} query is
$[2,3)$ by following the query result with the notice
``\command{select validity 2 3}''.

\subsubsection{Aggregates}

The approach described above for tracking validity intervals works
correctly with all standard classes of query operators except for one:
aggregates, such as the \command{count} and \command{sum}
functions. Aggregates are fundamentally different than other query
operators in that they do not commute with the visibility check. That
is, we discussed earlier the possibility of delaying the visibility
check until after query processing is complete, but this cannot be
done with aggregates: it wouldn't be correct to execute a
\command{count} query by gathering the set of all tuple versions,
ignoring validity, counting them, and \emph{then} trying to perform a
validity check.

Tracking the validity interval of aggregates requires an extra step,
compared to other query operators. With other operators, such as joins
or filters, it suffices to set the validity interval of the generated
tuple to the intersection of the validity intervals of the tuple or
tuple that generated it. For aggregates, we must also add the inverse
of that validity interval to the invalidity mask. The reason for this
extra step is that an aggregate, such as \command{count}, would
produce a \emph{different} result tuple at times outside the result's
validity interval. (This differs from operators like joins which
produce \emph{no} tuple at these other times.) Using the invalidity
mask to track this effect is appropriate, as the tuples corresponding
to different aggregate results are effectively a different type of
phantom.

\subsection{Managing Pinned Snapshots}
\label{sec:storage:pincushion}

The database modifications described in
\autoref{sec:storage:db:concurrency} explain how the database
implements pinned snapshots. However, the \txcache consistency
protocol requires pinned snapshots to be created on a regular basis,
and requires application servers to be aware of which snapshots are
pinned on the database, and their associated wall-clock timestamps. As
described in \autoref{sec:consistency-protocol:consistency}, the
\txcache library needs this to determine which pinned snapshots are
within a read-only transaction's application-specified staleness
limit.

The block store we described earlier does not require any of this
management of pinned snapshots because it provided access to
\emph{all} previous versions of data. However, if we wanted the block
store to discard sufficiently old data -- an important practical
concern -- it would also need to notify the application servers
as to which versions are available.

% One way to achieve this would be to make the DBMS responsible for
% tracking this information. The \txcache library on each application
% server could contact at the start of each transaction to obtain a list
% of pinned snapshots. However, this approach is undesirable because it
% increases the load on the database server, making it more likely to
% become a bottleneck. In particular, application servers would need to
% contact the database on each transaction even if all the data they
% access is available in the cache.

% We describe two approaches for disseminating and managing the list of
% pinned snapshots. The first is a centralized approach, but uses a
% separate daemon to track the snapshots rather than the database. The
% second approach is more scalable, using a multicast system to
% disseminate the information.

We employ another module called the \emph{pincushion} to manage the
set of pinned snapshots. It notifies the storage layer when to create
pinned snapshots, releases them (\command{unpin}) when they are no
longer needed, and notifies the application servers about which
snapshots are available.  This section describes its operation.

\subsubsection{Pincushion}

The \emph{pincushion} is a lightweight daemon that tracks and manages
the set of pinned snapshots. In theory, this functionality could be
incorporated into the DBMS itself. However, we chose to locate it in a
separate daemon in order to minimize the changes we had to make to the
database, and to reduce the load on the database (which can easily
become the bottleneck). This daemon can be run on the database host,
on a cache server, or elsewhere; in our experiments, we co-located it
with one of the cache servers.

The pincushion maintains a table of currently pinned snapshots,
containing the snapshot's ID, the corresponding wall-clock timestamp,
and the number of running transactions that might be using it. It
periodically (\eg every second; the frequency is a configurable system
parameter) sends a \command{pin} request to the database server,
pinning the current snapshot. It then adds that snapshot, and the
corresponding wall-clock time (as reported by the database) to the
table.


When the \txcache library on an application node begins processing a
read-only transaction, it requests from the pincushion all
sufficiently fresh pinned snapshots, \eg those pinned in the last 30
seconds.  The pincushion sends a list of matching snapshots, and flags
them as potentially in use, for the duration of the transaction. The
\txcache library can also request that the pincushion create a new
pinned snapshot; the library would do so if there are no
sufficiently fresh pinned snapshots already, \eg if snapshots are
taken every second but the application requests to run a transaction
with freshness limit 0.1 seconds.

The pincushion also periodically scans its list of pinned snapshots,
removing any unused snapshots older than a threshold by sending an
\command{unpin} command to the database.

The pincushion is accessed on every transaction, making it a potential
bottleneck for the scalability of the system. However, it performs
little computation, making this only an issue in large deployments.
For example, in our experiments (\autoref{cha:evaluation}), nearly all
pincushion requests received a response in under 0.2 ms, approximately
the network round-trip time.
%\drkp{Should look up the number of
%  reqs/sec the pincushion was handling; we have that data... somewhere.}

\subsubsection{Improving Scalability}

Given that the pincushion is a centralized service, it could become a
bottleneck in large systems. Accordingly, we would like to eliminate
the requirement for the pincushion to be accessed on every
transaction, in order to improve scalability.  We observe that the
pincushion serves two separate purposes: it allows transactions to
look up which snapshots are pinned, and it tracks which snapshots are
in use so that it can unpin old unused snapshots.  We address the
first goal by disseminating information about pinned snapshots using
multicast, and the second using timeouts.

The pincushion continues to be responsible for creating new pinned
snapshots periodically. However, rather than having application
servers contact the pincushion to learn which snapshots are pinned, we
distribute this information using a multicast mechanism. Whenever the
pincushion creates a new pinned snapshot, it sends a message
containing the snapshot's ID and timestamp to all application
servers. Each application server uses this to maintain a local copy of
the table of pinned snapshots. Note that this assumes the availability
of an application-level multicast system that can distribute messages
from the database to all application servers. This is a reasonable
assumption as these servers will typically be in the same datacenter;
moreover, our invalidation mechanism (\autoref{cha:invalidations})
also relies on a multicast mechanism.

Because application servers do not contact the pincushion when they
start transactions, the pincushion no longer knows which pinned
snapshots are in use. To garbage-collect pinned snapshots, we
therefore use a very simple strategy: the pincushion simply removes
them after a fixed timeout. When it does, it multicasts a message to
the application servers notifying them of this change so they can
update their table. Unlike the centralized approach, this timeout
strategy makes it possible for a transaction's snapshot to be released
before it has finished executing, potentially forcing it to abort
because the data is no longer available. However, given a sufficiently
long timeout, this possibility is unlikely: most transactions should
be short, as long-running transactions are well known to be
problematic for many reasons.
%\drkp{citation?}
Long-running
transactions can contact the pincushion to request a longer
timeout. Note, however, that forcing such transactions to abort may
even be desirable in some circumstances, as retaining old versions of
data indefinitely is not without cost.

%%% Local Variables: 
%%% mode: latex
%%% TeX-command-default: "Make"
%%% TeX-PDF-mode: t
%%% TeX-master: "main.tex"
%%% End: 

%  LocalWords:  Serializability multiversion tuple SnapshotData
%  LocalWords:  snapshotid timestamps timestamp lookups versioned
%  LocalWords:  scalability multicast invalidations multicasts
