Wednesday, August 17, 2011

Implementing a message queue with MSSQL

I’m working on the project that needs some background application servers to execute long-running tasks in order to off load the web server. Architecturally, there are a couple ways that you could do it, but I want to leverage existing infrastructure--that means using MSSQL server. One of the things I want to avoid was polling SQL server for new tasks. SQL server provides a cool technology built right into the database engine called Service Broker. It provides native support for messaging and queuing applications. One of the things you can do with the Service Broker is setup a query notification dependency between an application and an instance of SQL server. This makes it so you can receive notifications from SQL server when a table changes. .NET provides a class called SqlDependency to register to receive these very query notifications. Here is a toy database:
USE [master]
GO
/****** Object:  Database [QueueSample]    Script Date: 08/17/2011 17:49:11 ******/
CREATE DATABASE [QueueSample] ON  PRIMARY 
( NAME = N'QueueSample', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL10_50.MSSQLSERVER\MSSQL\DATA\QueueSample.mdf' , SIZE = 2048KB , MAXSIZE = UNLIMITED, FILEGROWTH = 1024KB )
LOG ON 
( NAME = N'QueueSample_log', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL10_50.MSSQLSERVER\MSSQL\DATA\QueueSample_log.ldf' , SIZE = 1024KB , MAXSIZE = 2048GB , FILEGROWTH = 10%)
GO
ALTER DATABASE [QueueSample] SET COMPATIBILITY_LEVEL = 100
GO
IF (1 = FULLTEXTSERVICEPROPERTY('IsFullTextInstalled'))
begin
EXEC [QueueSample].[dbo].[sp_fulltext_database] @action = 'enable'
end
GO
ALTER DATABASE [QueueSample] SET ANSI_NULL_DEFAULT OFF
GO
ALTER DATABASE [QueueSample] SET ANSI_NULLS OFF
GO
ALTER DATABASE [QueueSample] SET ANSI_PADDING OFF
GO
ALTER DATABASE [QueueSample] SET ANSI_WARNINGS OFF
GO
ALTER DATABASE [QueueSample] SET ARITHABORT OFF
GO
ALTER DATABASE [QueueSample] SET AUTO_CLOSE OFF
GO
ALTER DATABASE [QueueSample] SET AUTO_CREATE_STATISTICS ON
GO
ALTER DATABASE [QueueSample] SET AUTO_SHRINK OFF
GO
ALTER DATABASE [QueueSample] SET AUTO_UPDATE_STATISTICS ON
GO
ALTER DATABASE [QueueSample] SET CURSOR_CLOSE_ON_COMMIT OFF
GO
ALTER DATABASE [QueueSample] SET CURSOR_DEFAULT  GLOBAL
GO
ALTER DATABASE [QueueSample] SET CONCAT_NULL_YIELDS_NULL OFF
GO
ALTER DATABASE [QueueSample] SET NUMERIC_ROUNDABORT OFF
GO
ALTER DATABASE [QueueSample] SET QUOTED_IDENTIFIER OFF
GO
ALTER DATABASE [QueueSample] SET RECURSIVE_TRIGGERS OFF
GO
ALTER DATABASE [QueueSample] SET  ENABLE_BROKER
GO
ALTER DATABASE [QueueSample] SET AUTO_UPDATE_STATISTICS_ASYNC OFF
GO
ALTER DATABASE [QueueSample] SET DATE_CORRELATION_OPTIMIZATION OFF
GO
ALTER DATABASE [QueueSample] SET TRUSTWORTHY OFF
GO
ALTER DATABASE [QueueSample] SET ALLOW_SNAPSHOT_ISOLATION OFF
GO
ALTER DATABASE [QueueSample] SET PARAMETERIZATION SIMPLE
GO
ALTER DATABASE [QueueSample] SET READ_COMMITTED_SNAPSHOT OFF
GO
ALTER DATABASE [QueueSample] SET HONOR_BROKER_PRIORITY OFF
GO
ALTER DATABASE [QueueSample] SET  READ_WRITE
GO
ALTER DATABASE [QueueSample] SET RECOVERY FULL
GO
ALTER DATABASE [QueueSample] SET  MULTI_USER
GO
ALTER DATABASE [QueueSample] SET PAGE_VERIFY CHECKSUM
GO
ALTER DATABASE [QueueSample] SET DB_CHAINING OFF
GO
EXEC sys.sp_db_vardecimal_storage_format N'QueueSample', N'ON'
GO
USE [QueueSample]
GO
/****** Object:  User [queuesampleuser]    Script Date: 08/17/2011 17:49:11 ******/
CREATE USER [queuesampleuser] FOR LOGIN [queuesampleuser] WITH DEFAULT_SCHEMA=[dbo]
GO
/****** Object:  Table [dbo].[QueueItems]    Script Date: 08/17/2011 17:49:13 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[QueueItems](
[QueueItemId] [int] IDENTITY(1,1) NOT NULL,
[Action] [varchar](50) NOT NULL,
[Parameters] [varchar](8000) NOT NULL,
CONSTRAINT [PK_QueueItems] PRIMARY KEY CLUSTERED 
(
[QueueItemId] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
/****** Object:  Table [dbo].[ProcessedQueueItems]    Script Date: 08/17/2011 17:49:13 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[ProcessedQueueItems](
[QueueItemId] [int] NOT NULL,
[Action] [varchar](50) NOT NULL,
[Parameters] [varchar](8000) NOT NULL,
[MachineName] [varchar](50) NOT NULL,
[Start] [datetime] NOT NULL,
[End] [datetime] NULL,
CONSTRAINT [PK_ProcessedQueueItems] PRIMARY KEY CLUSTERED 
(
[QueueItemId] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO

Notice how you have to enable the service broker per database. Here is the class used to register to receive query notifications:
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using SqlDependencyConsole.Properties;

namespace SqlDependencyConsole
{
public delegate void QueueChangeHandler();

public class JobQueue
{
private static readonly Guid GUID = Guid.NewGuid();

private SqlConnection _SqlConnection;
private SqlDependency _SqlDependency;

public event QueueChangeHandler QueueChanged;

public JobQueue()
{
SqlDependency.Stop(Settings.Default.QueueSampleConnectionString);
SqlDependency.Start(Settings.Default.QueueSampleConnectionString);
_SqlConnection = new SqlConnection(Settings.Default.QueueSampleConnectionString);
}

~JobQueue()
{
SqlDependency.Stop(Settings.Default.QueueSampleConnectionString);
}

public int? CheckQueue()
{
SqlCommand cmd = new SqlCommand("SELECT [QueueItemId],[Action],[Parameters] FROM [dbo].[QueueItems]", _SqlConnection);
cmd.Notification = null;

if (_SqlDependency == null)
{
Console.WriteLine("Creating sql dep");
_SqlDependency = new SqlDependency(cmd);
_SqlDependency.OnChange += new OnChangeEventHandler(OnChange);
}

if (_SqlConnection.State == System.Data.ConnectionState.Closed)
_SqlConnection.Open();

DataTable dataTable = new DataTable();
dataTable.Load(cmd.ExecuteReader());
int? queueItemId = null;
try
{
foreach (DataRow row in dataTable.AsEnumerable())
{
Console.WriteLine("Trying {0}", row.Field<int>("QueueItemId"));
try
{
cmd = new SqlCommand("INSERT INTO [QueueSample].[dbo].[ProcessedQueueItems] ([QueueItemId],[Action],[Parameters],[MachineName],[Start],[End]) VALUES(@QueueItemId,@Action,@Parameters,@MachineName,@Start,null)", _SqlConnection);
cmd.Parameters.AddWithValue("@QueueItemId", row.Field<int>("QueueItemId"));
cmd.Parameters.AddWithValue("@Action", row.Field<string>("Action"));
cmd.Parameters.AddWithValue("@Parameters", row.Field<string>("Parameters"));
cmd.Parameters.AddWithValue("@MachineName", GUID.ToString());
cmd.Parameters.AddWithValue("@Start", DateTime.Now);

int result = cmd.ExecuteNonQuery();

cmd = new SqlCommand("DELETE FROM [QueueSample].[dbo].[QueueItems] WHERE QueueItemId = @QueueItemId", _SqlConnection);
cmd.Parameters.AddWithValue("@QueueItemId", row.Field<int>("QueueItemId"));

result = cmd.ExecuteNonQuery();

queueItemId = row.Field<int>("QueueItemId");
break;
}
catch (Exception ex) { Console.WriteLine(ex.Message); }
}
}
catch (Exception ex) { Console.WriteLine(ex.Message); }
finally { _SqlConnection.Close(); }

return queueItemId;
}

private void OnChange(object sender, SqlNotificationEventArgs e)
{
Console.WriteLine("Change detected");
SqlDependency dependency = sender as SqlDependency;

// Notices are only a one shot deal
// so remove the existing one so a new 
// one can be added
dependency.OnChange -= OnChange;

_SqlDependency = null;

if (QueueChanged != null)
{
QueueChanged();
}

}
}
}

Here is the main class. Run a couple of them. The idea of this little toy program is to dispatch a task to exactly one process. The process gets the change notification and then tries to claim the top item in the queue by trying to insert it into another table. Because SQL enforces primary key uniqueness, exactly one process can successfully insert it. That’s how this toy program decides if it successfully got a queue item.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace SqlDependencyConsole
{
class Program
{
private static JobQueue _JobQueue;
private static Random _Random;

static void Main(string[] args)
{
_Random = new Random();
_JobQueue = new JobQueue();
_JobQueue.QueueChanged += new QueueChangeHandler(OnQueueChanged);
Console.WriteLine("Running...");
CheckQueue();
Console.ReadLine();
}

private static void OnQueueChanged()
{
CheckQueue();
}

private static void CheckQueue()
{
int? queueItemId = _JobQueue.CheckQueue();
if (queueItemId.HasValue)
{
Console.WriteLine("Processing {0}", queueItemId.Value);
Thread.Sleep(_Random.Next(2000));
CheckQueue();
}
}
}
}