% $Id: dp1.tex,v 1.6 2004/03/18 19:55:51 dan Exp $
\documentclass{article}
\input{../6033-preamble}
\usepackage{clrscode,graphicx,fancyhdr}

\begin{document}
\bibliographystyle{IEEEtran}
\pagestyle{empty}
\null \vfil
%\vskip 1em
\begin{center}
\par
\huge{A Concurrently-Staged Design \\for Surveillance Monitoring}
\vskip 4em
{\lineskip .75em
\Large{Dan Ports}\par
\Large{March 18, 2004}\par
\vskip 5em
\large{6.033 Design Project 1}\par
\large{Recitation \#9: Karger/Lesniewski-Laas}
}\end{center}
\newpage



\lhead{D. Ports}
\chead{A Concurrently-Staged Design for Surveillance Monitoring}
\rhead{\thesubsection}
\pagestyle{fancy}
\pagenumbering{roman}
\tableofcontents
\listoffigures
\newpage
\pagenumbering{arabic}

\section{Introduction}
\label{sec:introduction}

This design implements a Surveillance-At-Home system that achieves the
goals of fault isolation, graceful degradation under load, and
performance. We accomplish these goals using the Scalable Event-Driven
Architecture, dividing the processing into a series of stages that
operate concurrently. By using address space separation and strictly
limiting interaction between stages, the system achieves fault
isolation. A controller monitors system performance uses a feedback
algorithm to dynamically reallocate system resources and implement
load shedding, providing the system with high performance and graceful
degradation.

\section{Design Overview}
\label{sec:design-overview}

This report presents a design for the central computer of a video
surveillance system that uses both image processing and human spotters
to identify suspicious activity. The system receives video streams
from many cameras simultaneously, and divides them into short
segments. Each segment is transcoded into a standard image format, and
then passed through a vision module that identifies frames with
potential suspicious activity. Images that have received a high threat
index are stored in a queue. A web server responds to HTTP requests
from human spotters by sending an image from the queue. If the human
spotters agree that the image is suspicious, an appropriate action is
taken, such as notifying the operator.


\subsection{The SEDA Model}
\label{sec:seda}
This design is based on the \emph{Staged Event-Driven Architecture}
(SEDA) model introduced by Welsh \cite{welsh-phd}. This architecture
divides the processing of requests into \emph{stages}. Each stage
consists of a \emph{stage controller} that maintains a queue of requests
(``\emph{events}'') to be processed, and \emph{event handler}
processes that are invoked by the controller to handle each event.
Multiple handler processes for the same stage can operate
concurrently, up to a maximum number for the stage that is determined
dynamically. Individual stages only communicate with each other by
inserting events into each other's event queue through a well-defined
internal interface.

In this system, an event represents either a partially-processed
segment of video data from one camera, or an incoming HTTP request
from one of the spotters' web browsers. The processing sequence can be
viewed as a pipeline: events are processed, then transmitted to the
next stage. The data flow of events through stages is shown in
Figure~\ref{fig:dataflow}. Because incoming network traffic does not
naturally arrive conveniently partitioned into SEDA events, we create
\emph{interface stages} that receive data from the network, partition
it into appropriately-sized blocks (either one second of video data,
or a single HTTP request), then formats it as an event and passes it
to the next stage.

\begin{figure}[htbp]
  \centering
  \includegraphics[width=5.5in]{dataflowdiagram}
  \caption{Data flow between stages}
  \label{fig:dataflow}
\end{figure}

Stages communicate with each other by passing events. Once an event
handler process has completed its processing for a particular event,
it passes the result to the next stage's controller. As the stages
operate in different address spaces, inter-process communication must
be achieved through a sockets-based interface. Each stage controller
process listens on an internal port. To enqueue an event, event
handler processes establish a connection to the next stage's
controller via this port, and send the event data in the proper
marshalled form for the next stage.

In addition to the stage controllers that manage queues for each stage
and start event handler processes, a master controller process
oversees the stage controllers. The master controller monitors the
number of event handlers currently running at each stage and the queue
sizes.  It adaptively changes the maximum number of processes at each
stage to reflect the demands placed on the system by that stage's
processing: by starting more processes at a given stage, the kernel's
preemptive scheduler allocates more time to that stage. By dynamically
reallocating processing power, the master controller eliminates
bottlenecks as they arrive. In addition, the master controller
monitors the system load, and orders the stages to begin shedding load
when the system is overloaded. These feedback-based algorithms make
graceful degradation possible.


\subsection{Stages}
\label{sec:stages}

Seven discrete stages are used in the processing pipeline. These
stages are described in detail in Section~\ref{sec:design-description},
but we present a brief overview here:

\begin{description}
\item[Camera HTTP Accumulator] This interface stage receives data from
  the cameras via HTTP, and puts it in event form. It stores incoming
  data in a buffer associated with each camera. When one second of
  video data has been received, the contents of the buffer are passed
  as an event to the transcoder stage. This stage also opens the HTTP
  connections to the cameras.

\item[Transcoder] This stage converts the stream chunks received from
  the cameras into the standard JPEG format. It is a wrapper for the
  \proc{Transcode} procedure.

\item[Anomaly Detection] The Anomaly Detection stage is a wrapper for
  the \proc{Detect-Anomaly} procedure. It takes in a transcoded video
  stream, and outputs a threat index and the single most suspicious
  frame.
  
\item[Assignment] This stage handles the assignment of video frames to
  spotters. It has two input queues: one receiving scored frames from
  the Anomaly Detection stage, and one receiving HTTP requests from
  the HTTP Server Parser. It maintains an internal state table of
  recently arrived frames, and uses this to assign frames to spotters
  based on their threat index. The pairing of the HTTP request and
  selected frame is enqueued for the HTTP Server Reply stage.

\item[HTTP Server Accumulator] The HTTP Server Accumulator is an
  interface stage that listens on TCP port 80 for HTTP connections
  from the spotters, and buffers incoming data. Once a full HTTP request
  is received, it encodes it as an event and passes it to the
  HTTP Server Parser stage.

\item[HTTP Server Parser] This stage parses an incoming HTTP request
  from a spotter's web browser. It identifies whether the request is
  for a new image, or a ranking for a previous image, and passes it to
  the Assignment stage.
  
\item[HTTP Server Reply] This stage takes in an event containing a HTTP
  connection ID and a frame, and generates and sends a reply to the
  spotter's web browser.
\end{description}


\section{Design Description}
\label{sec:design-description}


\subsection{Operating System Requirements}
\label{sec:os}

This system design is intended to operate using standard PC hardware
and an operating system similar to Unix. We describe procedures using
Unix system call notation. Some of the features that we require of the
kernel include:
\begin{itemize}
\item Preemptive scheduling with a round-robin scheduler is required
  to ensure that processor time is divided equitably among all
  threads.
\item A sockets-based communication API is used. In particular, we
  require a non-blocking interface and the \proc{select} system call.
\item We must be able to use sockets to communicate between processes
  on the same machine.
\item The ability to create a shared memory segment between selected
  processes is required. Mutual-exclusion locks (``mutexes'') are used
  for synchronizing access to shared memory.
\end{itemize}


\subsection{Master Controller}
\label{sec:master-controller}

The master controller module has two primary functions. First, it 
creates the seven stage controller processes when the system is
starting up. Second, when the system is operating, it monitors actual
system performance and uses a feedback-based algorithm to adaptively
reallocate processor time in order to maintain optimum performance.

The master controller and stage controller processes communicate via a
memory segment they share (but the event handlers do not). This shared
memory segment is used for exchanging measurements and
parameters. Access to each shared variable is controlled by a
mutual-exclusion lock (``mutex''), which ensures that updates are
synchronized between processes. The following variables are included
in the shared memory segment:

\begin{itemize}
\item \id{load-shedding-fraction}: the fraction of incoming data to be
  dropped, set by the master controller
\item For each stage, \id{max-event-handlers}[\id{stage}]: the maximum
  number of event handler processes that can be invoked by each stage,
  set by the master controller
\item For each stage, \id{queue-length}[\id{stage}]: the number of events
  currently in the stage's queue, set by the stage controller
\item For each stage, \id{events-processed}[\id{stage}]: a counter variable
  for the number of events processed by the stage, incremented by the
  stage controller, and measured and reset every second by the master
  controller.
\end{itemize}

Creating the stage controller processes is a straightforward
procedure. When the master controller is started, it first allocates
the shared memory segment and mutexes described above. Then, for each
of the seven stages, it creates a new process with the \proc{fork}
system call. Each of the child processes then executes the appropriate
stage controller code. The master process then proceeds to its
monitoring loop.

The procedure for monitoring system performance is more complex. The
goal of this procedure is to adjust two parameters based on
measurement of system performance: the maximum number of event handler
processes to be used by each stage, and the fraction of incoming
data to drop.

The master controller monitors the performance only of the stages in
the \emph{camera pipeline}: the Transcoder, Anomaly Detection, and
Assignment stages, and these are the only stages it adjusts the
maximum number of event handlers for. The stages in the HTTP server
operate with a fixed maximum number of event handlers, because we
predict that the camera stream processing will be responsible for the
majority of the load, not the HTTP server. Hence, when we refer to
``all stages'' being monitored and adjusted, we mean ``all stages in
the camera pipeline''.

Since processor time is scheduled preemptively using a round-robin
scheduler, the percentage of processor time devoted to each stage is
proportional to the number of event handler processes executing it.
This allows bottlenecks to be avoided. Normally, if the system is
under load and one stage is operating especially slowly, it will act
as a bottleneck, and its queue will fill, causing the pipeline to back
up at this stage. Allocating more processes to this stage does not
increase the total processing power of the system; however, it does
ensure that more processing time is spent on the bottleneck stage
\emph{relative to the other stages}. This reallocation ensures that
all stages are processing data at approximately the same rate (in
terms of video segments per second), so no stage remains a bottleneck.

The algorithm used to determine the number of processes allocated to
each stage is based on computing the \emph{queued workload} for each
stage, which we define to be the product of the number of events
currently in that stage's queue and the average amount of time
required to process each event. The average amount of processing time
per event is computed by measuring the number of events processed by
the stage each second from the shared memory buffer, and calculating a
moving average over the last 15 seconds. The stage with the lowest
queued workload is assigned one event handler process, and larger
number of event handler processes are assigned to the other processes
in proportion to their relative queued workloads.

% FIXME: add compensation for CPU time
% FIXME: add footnote about multi-CPU systems

Although reallocating processor time by adjusting the per-stage
process count allows bottlenecks to be avoided, it does not deal
with the case when the system is overloaded: when the system simply
cannot process the video data fast enough to keep up with the incoming
streams. In this case, there is no other option but to start shedding
load by dropping segments of the incoming streams. Since it would be
wasteful to spend time processing a segment only to discard it later,
load shedding is performed at the earliest stage of the pipeline: by
the Camera HTTP Accumulator.

Load shedding is accomplished by automatically adjusting a parameter
that instructs the Camera HTTP Accumulator stage as to how much load
should be discarded. The critical insight here is that the amount of
data to be processed (i.e. not discarded) should be kept as close as
possible to the amount of data that the system is actually able to
process. Thus, we define the \emph{relative rate ratio}, which is
equal to the number of video segments that are processed each second
divided by the number of video segments that arrive each second (which
is equal to the number of cameras, since one segment is one second of
data from one camera). The number of segments processed per second can
be measured by using a moving average of the number of events handled
by the Assignment process each second. The proportion of video frames
to be dropped will be 1 minus the relative rate ratio.

\begin{figure}[htbp]
  \centering

  \begin{codebox}
    \Procname{$\proc{Master-Controller}()$}
    \li allocate shared memory segment and mutexes
    \li \For \id{stage-controller} \textbf{in} \id{all-stage-controllers}
    \li \Do
     \If $\proc{fork}() = 0$
     \li \Then
     \proc{Execute} \id{stage-controller}
     \End
     \End
    \li
    \li \While (1)
    \li \Do
    \proc{Sleep}(1 second)
    \li \For \id{stage} \textbf{in} \id{camera-pipeline-stages}
    \li \Do
    \proc{Acquire-Mutex}(\id{lock[stage]})
    \li $\id{events-processed-average[stage]} \gets
    \proc{Update-Moving-Average}(\id{events-processed[stage]})$
    \li $\id{events-processed[stage]} \gets 0$
    \li $\id{queued-workload[stage]} \gets \id{queue-length[stage]} /
    \id{events-processed-average[stage]}$
    \li \proc{Release-Mutex}(\id{lock[stage]})
    \End
    \li $\id{load-shedding-fraction} \gets 1 -
    (\id{events-processed-average}[\const{Assignment}] /
    \const{Max-Cameras})$
    \li \For \id{stage} \textbf{in} \id{all-stages}
    \li \Do
    \li $\id{max-event-handlers[stage]} \gets
    \id{queued-workload[stage]} / \min\{\id{queued-workload}\}$
    \End
    \End
  \end{codebox} 

  \caption{Master controller pseudocode}
  \label{fig:master-controller-code}
\end{figure}


\subsection{Common Procedures for Queue Management}
\label{sec:common-procedures}

Communication between stages in this architecture takes place only
through events in queues: one stage generates an event containing the
data for the next stage to process, and then transmits the event to
the next stage's queue. Since this design pattern is ubiquitous in
this system, we define a standard protocol for placing events in
another stage's queue, and a pair of common procedures that implement
this protocol. These procedures are used in nearly every module.


\subsubsection{The Protocol}

We define a protocol that makes possible a single operation: one stage
can place an event in another stage's queue. To perform this
operation, the stages communicate via sockets-based connections within
the system. A datagram-based protocol such as UDP is used; since the
connections are only internal to the system, we do not need to worry
about packet loss or corruption. Each stage controller process listens
on a fixed internal port, which we refer to as
\const{Ports}[\id{stage}]. To enqueue an event, the stage creating the
event connects to the destination stage on port
\const{Ports}[\id{destination-stage}]. It then transmits the event in
a marshalled form: each field in the event is transmitted in
sequence.

\subsubsection{Transmitting an Event}

We define the common procedure \proc{Send-Event-To-Stage} for placing
an event in another stage's queue, according to the protocol above:

\begin{codebox}
  \Procname{\proc{Send-Event-To-Stage}(\id{event},
  \id{destination-stage})}
  \li \id{port} $\gets$ \const{Ports}[\id{destination-stage}]
  \li \id{buffer} $\gets$ \proc{Marshal}(\id{event})
  \li \proc{Send}(\id{port}, \id{buffer})
  \li    \proc{Acquire-Mutex}(\id{lock}[\id{destination-stage}])
    \li $\id{queue-length}[\id{destination-stage}] \gets
    \id{queue-length}[\id{destination-stage}] + 1$
    \li
    \proc{Release-Mutex}(\id{lock}[\id{destination-stage}])
 
\end{codebox}

\subsubsection{Receiving Events}

We define the common procedure \proc{Receive-Events-Into-Queue} for
receiving events from other stages and placing them into a queue. It
takes as arguments the file descriptor for the socket on which the
events are to be received, and the queue to which the events are to be
added. This procedure is designed to be called from a stage controller
process's event loop.

\begin{codebox}
  \Procname{\proc{Receive-Events-Into-Queue}(\id{fd}, \id{queue})}
  \li \id{new-queue} $\gets$ \id{queue}
  \li \While (\proc{Data-Available-On-FD}(\id{fd}))
  \li \Do
  \id{buffer} $\gets$ \proc{Recv}(\id{fd})
  \li \If (\proc{Is-Valid-Event}(\id{buffer}))
  \li \Then
  \id{event} $\gets$ \proc{Unmarshal}(\id{buffer})
  \li \id{new-queue} $\gets$ \proc{Add-To-Queue}(\id{new-queue},
  \id{event})
  \End
  \li    \proc{Acquire-Mutex}(\id{lock}[\id{stage}])
    \li $\id{queue-length}[\id{stage}] \gets
    \id{queue-length}[\id{stage}] - 1$
    \li
    \proc{Release-Mutex}(\id{lock}[\id{stage}])
  \End
  \li \Return \id{new-queue}
\end{codebox}

\subsection{Camera HTTP Accumulator}
\label{sec:camera-http-accumulator}

The Camera HTTP Accumulator stage is the interface stage that handles
the HTTP connections to the cameras. Upon startup, it opens a
connection to each camera, sends the HTTP GET request to initiate the
video stream, and allocates a buffer for each connection.

Unlike most of the other stages, it does not use event handler
processes. Instead, after startup, it operates as a loop. During each
iteration of the loop, it waits for one second, then reads from each
HTTP connection into its associated buffer. The \emph{non-blocking}
variant of the \proc{Read} system call is used.

Once data is read from each connection, the stage determines what to
do with the data. If the system is under normal load, all data is sent
to the next stage. If the system is overloaded, data from
$n$ connections is discarded, where
\[n = \id{load-shedding-fraction} \times \const{Num-Cameras}\]
The stages chooses which data to discard by maintaining a
table of when each connection's data was last processed. It then
chooses the $(\const{Num-Cameras} - n)$ least-recently-processed
buffers to process, sends their data to the next stage, and discards
the rest. The last-processed-time table is updated, and all buffers
are cleared in preparation for the next read sequence.

Data from each buffer that is not discarded is marshalled into event
form: the raw data is associated with the camera ID number, and
encapsulated in an event data structure. This event is then passed to
the Transcoder stage using the standard \proc{Send-Event-To-Stage}
procedure.


\subsection{Transcoder}
\label{sec:transcoder}

The Transcoder stage receives raw data from the Camera HTTP
Accumulator stage, and converts it into a sequence of JPEG images, a
standard format. The conversion takes place using the provided
\proc{Transcode} function.

This stage consists of a stage controller that receives events in its
queue, and event handler processes that essentially wrap the
\proc{Transcode} function. The stage controller operates as a
loop. During each iteration of the loop, it uses the standard
\proc{Receive-Events-Into-Queue} function to accept any events that
may have been sent to it on its port and place them into the queue. If
the queue contains an event and the number of processes currently
running is less than the maximum number of processes allowed for this
stage, a new event handler process is created in a new address space.

The event handler process takes the raw data from the event, and calls
the \proc{Transcode} function to transcode it. Once this transcoding
has completed, it then creates a new event containing the transcoded
images and the camera ID, and sends it to the Anomaly Detection stage
using the standard \proc{Send-Event-To-Stage} procedure.

\begin{figure}[htbp]
  \centering
  \begin{codebox}
    \Procname{\proc{Transcoder-Stage-Controller}}
    \li \While (1)
    \li \Do
    \id{queue} $\gets$ \proc{Receive-Events-Into-Queue}
    \li \While (\id{queue} is not empty \textbf{and}
    \id{event-handlers-running} <
    \id{max-event-handlers}[\const{Transcoder}])
    \li \Do
    \proc{Acquire-Mutex}(\id{lock}[\const{Transcoder}])
    \li $\id{events-processed}[\const{Transcoder}] \gets
    \id{events-processed}[\const{Transcoder}] + 1$
    \li
    \proc{Release-Mutex}(\id{lock}[\const{Transcoder}])
    \li $\id{event} \gets \proc{Remove-From-Queue}(\id{queue})$
    \li \If \proc{Fork}() = 0
    \li \Then
    \proc{Exec}(\proc{Transcoder-Event-Handler})
    \End
    \End
    \End
  \end{codebox}

  \begin{codebox}
    \Procname{\proc{Transcoder-Event-Handler}}
    \li \id{output-buffer} $\gets$ \proc{Transcode}(\id{event}.data)
    \li \id{new-event}.transcoded-data $\gets$ \id{output-buffer}
    \li \id{new-event}.camera-id $\gets$ \id{event}.camera-id
    \li \proc{Send-Event-To-Stage}(\id{new-event}, \const{Anomaly-Detection})
  \end{codebox}
  \caption{Transcoder stage pseudocode}
  \label{fig:transcoder-pseudocode}
\end{figure}


\subsection{Anomaly Detection}
\label{sec:anomaly-detection}

The Anomaly Detection stage uses a provided vision procedure
(\proc{Detect-Anomaly}) to identify suspicious frames in the data
supplied from the transcoder stage, and determine a threat index
between 0 and 1 that indicates how suspicious the frame is.

The stage operates essentially like the Transcoder stage: it is a
simple wrapper for the \proc{Detect-Anomaly} function. The output from
this function is encoded into an event and sent to the Assignment
stage.

We omit the pseudocode for the
\proc{Anomaly-Detection-Stage-Controller} procedure, as it is
virtually identical to the analogous procedure for the Transcoder
module.

\begin{figure}[htbp]
  \centering
  \begin{codebox}
    \Procname{\proc{Anomaly-Detection-Event-Handler}}
    \li \id{vision-result} $\gets$ \proc{Detect-Anomaly}(\id{event}.transcoded-data)
    \li \id{new-event}.index $\gets$ \id{vision-result}.threat
    \li \id{new-event}.frame $\gets$ \id{event}.transcoded-data[\id{vision-result}.frame-pointer]
    \li \id{new-event}.camera-id $\gets$ \id{event}.camera-id
    \li \proc{Send-Event-To-Stage}(\id{new-event}, \const{Assignment})
  \end{codebox}
  \caption{Anomaly Detection stage pseudocode}
  \label{fig:anomaly-detection-pseudocode}
\end{figure}


\subsection{Assignment}
\label{sec:assignment}
The Assignment stage assigns video frames to spotters. It receives
video frames as events from the Anomaly Detection module, and requests
for images from the HTTP Server Parser stage. Hence, unlike the other
stages, it must maintain two queues for receiving a different type of
events from each of these two stages. This stage also differs from the
other stages in that it does not use event handler threads; all
processing takes place in the stage controller process. This design
choice was made because the Assignment stage must maintain extensive
state: it must keep a table of recently-received suspicious frames
that are waiting to be assigned to spotters.

We naturally want to ensure that the most suspicious frames are
the first ones directed to the spotters. However, it is also desirable
that suspicious frames from all cameras are processed; even if one
camera is generating many very suspicious frames, suspicious frames
from other cameras should also be processed. Hence, we maintain another
table associating each camera ID with the time a frame from it was
last processed. We generate a ranking for each frame in the queue of
how long it has been since a frame from that camera was last
processed, and then compute a \emph{weight function}
\[w(\id{frame}) = \alpha(\id{frame}.\mbox{threat-index}) +
\beta(\id{frame}.\mathrm{ranking})\]
The constants $\alpha$ and $\beta$
indicate how heavily we wish to weight the threat index relative to
the time-since-last-processed ranking. We leave the exact numeric values of
these parameters unspecified, since they depend on the range of
variation of the values returned by the \proc{Detect-Anomaly}
function. We also define a constant $\epsilon$ that represents the
minimum threat index required for a frame to be considered suspicious
at all; frames whose threat index is less than $\epsilon$ will be
simply discarded instead of being assigned to spotters. We leave the
value of $\epsilon$ unspecified for the same reason.

The stage controller process operates as a loop. On each iteration of
the loop, it uses the \proc{Receive-Events-Into-Queue} procedure
\emph{twice}, to receive events from the sockets corresponding to each
of its input queues, and places the resulting events into separate
queue variables.

For each event received from the Anomaly Detection
stage, the frame, threat index, and camera ID are placed in the table
of received frames, if the threat index is greater than $\epsilon$. If
the table grows beyond some pre-specified maximum size (which we
anticipate to be at least one order of magnitude larger than the
number of spotters), the table is truncated to the maximum by removing
the frames whose weight function is lowest.

For each event received from the HTTP Server Parser stage, we
determine whether the event represents a request for a new frame, or a
spotter's evaluation of a frame.

If the event is a request for a new frame, the
Assignment stage controller selects the frame from the table that has
the highest value of the weight function. It removes this frame from
the table and places it in a new event. This new event will also
contain the camera ID number from, and the HTTP stream ID from the
event received from the HTTP Server Parser. The resulting event is
sent to the HTTP Server Reply stage. The table of
time-since-last-processed rankings for the cameras is then updated,
and the weight functions for all frames recalculated.

If instead the event is an evaluation of a frame, the system checks
whether the spotter evaluated the frame as suspicious. If so, the
system takes the appropriate action, i.e. notifying the operator.

% FIXME: add suspicious frame reply stuff.
% FIXME: add pseudocode

\subsection{HTTP Server Accumulator}
\label{sec:http-server-accumulator}

The HTTP Server Accumulator is an interface stage that performs a
function analogous to the Camera HTTP Accumulator. It listens on port
80 for incoming connections, reads from each open connection into a
buffer, and converts incoming requests to events. However, it differs
from the Camera HTTP Accumulator in three main respects: it does not
perform load-shedding, it does not read from a fixed set of camera
connections but rather from whatever clients have connected to the
server, and it converts incoming data into events not by dividing it
into one-second segments but rather by detecting a HTTP request.

Like the Camera HTTP Accumulator, this stage operates as a loop rather
than using event handler processes. During each iteration of the loop,
the \proc{Select} system call is used to determine which connections
have data available. For each such connection, the non-blocking
variant of the \proc{Read} system call is used to read all available
data into its associated buffer. This stage then checks the received
data for a double-carriage-return sequence that indicates the end of a
HTTP request; if one exists, it packages the connection file
descriptor and the received data into an event and sends it to the
HTTP Server Parser stage.

If the \proc{Select} system call indicates that a new connection has
been opened, the file descriptor for this new connection will be added to
the set of file descriptors for open connections that is passed to
\proc{Select}. In addition, a new buffer is allocated to hold the data
for this connection.

% FIXME: add pseudocode


\subsection{HTTP Server Parser}
\label{sec:http-server-parser}

The HTTP Server Parser stage receives raw data from the HTTP Server
Accumulator stage, and parses it as a HTTP request.

This stage operates using a stage controller and event handler
threads, much like the Transcoder or Anomaly Detection
stages. However, rather than dynamically determine the maximum number
of event handler processes that can be operating concurrently, we use
a fixed maximum limit which we anticipate to be on the order of 10. We
use a fixed limit rather than a load-adaptive algorithm for
simplicity, since we anticipate that the HTTP server portion of this
system will not be responsible for nearly as much of the load as the
camera-processing stages.

The stage controller loop receives events from the stage's socket
and stores them in the queue, and invokes event handler processes when
events are present in the queue and able to be processed.

The event handler processes parse the received data using the HTTP
protocol. If the request is a valid HTTP request, the handler
determines whether it is a request for a new frame, or a spotter's
response to a previous frame. It then creates a new event consisting
of this type of request, the connection's file descriptor. If the
request is a response to a previous frame, the event also includes the
frame ID and the spotter's evaluation of it (suspicious or not
suspicious). This event is sent to the Assignment process. If instead
the request is invalid, the event handler sends an error message on
the HTTP connection.

% FIXME: add RFC reference
% FIXME: add pseudocode


\subsection{HTTP Server Reply}
\label{sec:http-server-reply}

The HTTP Server Reply stage sends replies to HTTP requests for new
images. It receives an event from the Assignment stage that represents
a pairing of a HTTP request with a frame that has been assigned to it,
and sends that frame to that connection.

This stage also operates using a stage controller and event handler
threads. The stage controller loop receives events from the stage's
socket and stores them in the queue, and invokes event handler
processes when events are present in the queue and able to be
processed. We use the same fixed limit for the maximum number of event
handler processes as in the HTTP Server Parser stage, for the same
reasoning.

The event handler processes take the video frame contained in the
event, and generate a HTTP reply encoding a HTML page containing this
image. This reply is then sent to the spotter's web client via the
connection ID included in the event.

% FIXME: maybe include pseudocode?

\section{Analysis}
\label{sec:analysis}

The SEDA-based design presented in this paper achieves the three major
design objectives for this system: \emph{fault isolation},
\emph{graceful degradation under load}, and \emph{performance}.

\subsection{Fault Isolation}
\label{sec:fault-isolation}

The principal design objective for this system is fault isolation:
no module should be able to cause the entire system to crash. In
particular, the supplied \proc{Transcode} and \proc{Detect-Anomaly} procedures
are known to be unreliable.

The SEDA model naturally achieves fault isolation. The system is
divided into discrete stages, and each stage can only communicate with
the other stages through the narrowly-defined interface for placing
events in queues. Since each event transmitted to a stage is validated
before being enqueued, this prevents any failure of one stage from
affecting any others.

Moreover, the stages that use event handler processes, including the
Transcoder and Anomaly Detection stages, invoke these processes in
their own address spaces. This not only prevents a malfunctioning
event handler from causing a failure in a different stage, it prevents
the failure from affecting other event handlers in the \emph{same}
stage. This means that if, for example, the \proc{Detect-Anomaly}
causes a spurious memory write, the event handler will simply be shut
down, causing the event that triggered the error to be lost. Losing
this event is the desired effect, because attempting to repeat the
processing may cause the same error. All other parts of the system,
including the other events in the Anomaly Detection stage's queue, and
any other event handler processes running the \proc{Detect-Anomaly}
procedure will continue to operate as though nothing happened.

% FIXME: Add fault recovery


\subsection{Graceful Degradation}
\label{sec:graceful-degradation}

Another critical design objective is graceful degradation under heavy
load. Though the processor should be powerful enough to handle the
normal load presented to the system, brief periods of unexpectedly
high load may present a higher load than the system can handle. In
this case, the system will be forced to discard some incoming data. It
should continue to process data from each connection, ensuring that no
camera is starved for attention.

Our SEDA design with adaptive process allocation and load shedding
accomplishes this objective. By monitoring the system state and
measuring current performance, the feedback algorithm in the master
controller is able to tailor the system parameters in order to achieve
optimal resource allocation.

The following scenario demonstrates how the system can respond to an
unexpected increase in load:

Suppose the system is monitoring 1000 cameras when 100 of the cameras
suddenly begin to observe much more motion than usual, causing the
\proc{Detect-Anomaly} procedure to run very slowly. Since the
Transcoder and the other stages will still be running at the same
rate, the Anomaly Detection stage's queue will grow as it fills up
more rapidly than events can be processed.

When the master controller process takes its next set of measurements,
it will observe that the queue of the Anomaly Detection stage has
grown, and the average rate at which events are processed has also
increased. Hence, the queue workload for this stage will be
substantially higher than normal. In response, the master controller
will raise the Anomaly Detection stage's maximum-event-handler
allocation. This will allow the Anomaly Detection stage to start more
event handler processes, causing the kernel scheduler to allocate more
time to the stage. Thus, the queue will be filled more slowly because
the Transcoder stage will process events at a slower rate, and
emptied more rapidly because the Anomaly Detection stage will process
events at a faster rate. The feedback algorithm will continue to
adjust the maximum-event-handler allocations for each stage in order
to achieve a dynamic equilibrium. This eliminates the bottleneck at
the Anomaly Detection stage.

Though the process reallocation accounts for the bottleneck, it does
not change the fact that the system does not have enough processing
power to perform the processing for every camera and still keep up
with the input streams. By monitoring the rate at which events are
processed in the queue between the Anomaly Detection stage and the
Assignment stage, the master controller observes that the camera
pipeline is not processing segments as fast as they are arriving from
the network. In response, it instructs the Camera HTTP Accumulator
stage to begin shedding load, proportionally to the
difference between the rate at which data is arriving and the rate at
which it is being processed. The Camera HTTP Accumulator begins
dropping segments from some of the cameras. It does so in a manner
that ensures that segments are dropped in equal proportion for each of
the cameras. As a result, the system continues to process input from
each camera, though some frames are being dropped.

When the load presented by the cameras returns to normal, the master
controller will observe that the rate at which the system can process
data is increasing. It will then direct the Camera HTTP Accumulator
stage to decrease the load-shedding fraction until the full data
stream is once again being processed.


\subsection{Performance and Scalability}
\label{sec:performance}

This design leverages the inherent performance advantages of the SEDA
architecture. Through the use of massive event-driven concurrency and
adaptive load shedding such as that which we have described here,
applications developed with SEDA have been able to achieve impressive
performance results. The designers of the SEDA architecture presented
a high-performance web server, Haboob, that was able to outperform the
industry-standard Apache web server and the high-performance Flash web
server in performance tests, particularly under high load
\cite{seda-sosp}. This accomplishment is particularly impressive in
light of the fact that their web server was written in Java and
contained no optimizations other than those inherent to SEDA. Though
no performance testing has been done on our surveillance monitoring
system, we anticipate similar results.

The design can also be easily adapted to a larger scale with only a
few modifications. In particular, though we assume a single-CPU
computer for this design, the highly-concurrent nature of SEDA makes
it well-suited for a multiple-CPU system. In fact, its full
performance advantages cannot be realized without using multiple
CPUs. It is also possible to adapt the system to run stages on
\emph{separate} computer systems: since communication between
processes takes place through a sockets-based interface, the other
stage controllers may just as well be on a different computer
connected via a network. Clearly some modifications would need to be
made to the details of the design; for example, the master controller
and stage controllers could no longer communicate via shared
memory. However, most of the design architecture can be used without
major modifications.

\section{Conclusion}
\label{sec:conclusion}

We have presented a design for a surveillance monitoring system that
uses the SEDA architecture. The key features of our approach are a
decomposition of the process into stages, concurrent event-driving
processing of each stage, narrowly-defined interfaces for
communications between stages, and adaptive algorithms for allocating
resources and shedding load. These features allow our system to
achieve the design objectives of fault isolation, high performance,
graceful degradation under heavy load, and scalability.

\section*{Acknowledgements}
The architecture for this implementation is based on the SEDA
architecture developed by Matt Welsh as part of his Ph.D. thesis research;
more information is available at \\
\texttt{http://www.eecs.harvard.edu/~mdw/proj/seda/}.

Austin Clements provided many invaluable insights during the design
process. Chris Lesniewski-Laas also provided helpful comments.



\bibliography{dp1}

\end{document}
