Quick test of the Azure WCF service
Azure now has support for input endpoints on Worker Roles! (Azure Tools Nov 2009) This is awesome as it greatly simplifies building WCF services in the cloud. David Aiken has written an excellent Channel 9 post on exactly how to do this (here).
As per usual, I’ve gone ahead and ported the most important components to F# and I have included a short list of some of the difficulties I had while doing so.
Lessons Learned;
1. In Azure, you cannot just specify the input endpoints in code, you have to specify the endpoints in the properties for the role. It will work in the Dev fabric but will not initialize in the cloud. (thanks to Daniel C Wang)
Wrong:
let baseAddress = Uri("http://kapow.cloudapp.net:8080/service")
let myServiceHost = new ServiceHost((typeof<GreetingService>), [|baseAddress|])
Right:
using Microsoft.WindowsAzure.SerivceRuntime;
var listenAddress = RoleEnvironment
.CurrentInstance
.InstanceEndpoint["<name of your input endpoint >"]
.IPEndpoint;
string uri = String.Format("http://{0}" ,listenAddress)
2. TryGetValue can return the value in a 2-tuple instead of by ref – this makes it a lot more F# friendly. (from here)
This:
SessionInformation session; bool exists = sessions.TryGetValue(sessionId, out session)
Becomes this:
let (exists,session) = sessions.TryGetValue(sessionId)
3. KnownTypes – By default, you cannot use a subclass of a data contract class instead of its base class. This took me a while to figure out. (From Programming WCF Services – O’Reilly)
In C#:
[DataContract]
[KnownType(typeof(Custoemr))]
class Contact
{..}
[DataContract]
Class Customer : Contact
{..}
4. You have to take down an Azure deployment in order to change the input endpoints, so no in place upgrades with new endpoints. The Dr. Watson error for this wasn’t very clear.
Things I still can’t figure out;
I am assuming that the ICommunicationObject interface is added by an attribute, I just can’t cast to it. I get an invalid cast error at runtime. I need to do this, so if anyone knows how, please let me know.
In C#:
((ICommunicationObject)client).Abort();
In F#: (from here)
(box client :?> ICommunicationObject).Abort()
The Code:
The contract:
namespace AzureTalk.Contract
open System
open System.Collections.Generic
open System.ServiceModel
open System.Runtime.Serialization
[<DataContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10:schemas")>]
[<KnownType(typeof<SessionInformation>)>]
type ClientInformation =
[<DataMember>]
val mutable SessionId : string
[<DataMember>]
val mutable UserName : string
[<DataMember>]
val mutable RoleId : string
[<DataMember>]
val mutable IsActive : bool
new(sessionId, userName, roleId) =
{
SessionId = sessionId
UserName = userName
RoleId = roleId
IsActive = true
}
and [<DataContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10")>]
SessionInformation =
inherit ClientInformation
[<IgnoreDataMember>]
val mutable Callback : IClientNotification option
new(sessionId, userName, roleId, isActive, callback) =
{ inherit ClientInformation(sessionId, userName, roleId)
Callback = callback
}
and [<ServiceContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10" )>]
IClientNotification =
[<OperationContract(IsOneWay = true)>]
abstract DeliverMessage : message:string -> fromSessionId:string -> toSessionId:string -> Unit
[<OperationContract(IsOneWay = true)>]
abstract UpdateClientList : clientInfo:ClientInformation -> Unit
[<ServiceContract(
Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10",
CallbackContract = typeof<IClientNotification>,
SessionMode = SessionMode.Required)>]
type IChatService =
[<OperationContract(IsInitiating = true)>]
abstract Register : userName:string -> ClientInformation
[<OperationContract(IsInitiating = false)>]
abstract SendMessage : message:string -> sessionId:string -> Unit
[<OperationContract(IsInitiating = false)>]
abstract GetConnectedClients : Unit -> IEnumerable<ClientInformation>
The server:
namespace WorkerRole1
open System
open System.Diagnostics
open System.Linq;
open System.Net;
open System.Threading
open Microsoft.WindowsAzure.Diagnostics
open Microsoft.WindowsAzure.ServiceRuntime
open System.Runtime.Serialization
open System.Collections.Generic
open System.ServiceModel
open AzureTalk.Contract
type internal SessionManager() =
static let sessionLock = new ReaderWriterLockSlim()
static let read (f:Unit->'a) =
sessionLock.EnterReadLock()
try
f()
finally
sessionLock.ExitReadLock()
static let write (f:Unit->'a) =
sessionLock.EnterWriteLock()
try
f()
finally
sessionLock.ExitWriteLock()
static let sessions = new Dictionary<string, SessionInformation>()
static member TryGetSession(sessionId:string) = read (fun() -> sessions.TryGetValue(sessionId))
static member CreateOrUpdateSession (sessionId:string) (userName:string) (roleId:string) (callback:IClientNotification option) =
write (fun() ->
let (existingSession, session) = sessions.TryGetValue(sessionId)
if (not existingSession) then
let session = SessionInformation(sessionId, userName, roleId, true, callback)
sessions.Add(sessionId, session)
(existingSession, session)
else
session.SessionId <- sessionId
session.UserName <- userName
session.RoleId <- roleId
session.Callback <- callback
session.IsActive <- true
(existingSession, session))
static member RemoveSession (sessionId:string) =
write (fun() ->
let (exists,session) = sessions.TryGetValue(sessionId)
if (exists) then session.IsActive <- false; sessions.Remove(sessionId) |> ignore)
static member GetActiveSessions() = (read (fun() -> sessions.Values |> Seq.filter (fun s -> s.IsActive))).ToArray()
///Implementation of the WCF chat service
[<ServiceBehavior(
InstanceContextMode = InstanceContextMode.Single,
ConcurrencyMode = ConcurrencyMode.Multiple,
#if DEBUG
IncludeExceptionDetailInFaults = true,
#else
IncludeExceptionDetailInFaults = false,
#endif
AddressFilterMode = AddressFilterMode.Any)>]
type ChatService() =
let log message kind = Trace.WriteLine(message, kind)
member this.NotifyConnectedClients (clientInfo:ClientInformation) =
SessionManager.GetActiveSessions()
|> Array.iter (fun client ->
try
match client.Callback with
| Some(callback)-> callback.UpdateClientList(clientInfo)
| None -> ()
with
| :? TimeoutException as e ->
log (sprintf "Unable to notify client '%s'. The service operation timed out. '%s'" client.UserName e.Message) "Error"
//(box client :?> ICommunicationObject).Abort()
client.Callback <- None
| :? CommunicationException as e ->
log (sprintf "Unable to notify client '%s'. There was a communication problem. '%s' - '%s'" client.UserName e.Message e.StackTrace) "Error"
//(box client :?> ICommunicationObject).Abort()
client.Callback <- None)
interface IClientNotification with
member this.DeliverMessage message fromSessionId toSessionId =
let (fromExists,fromSession)= SessionManager.TryGetSession(fromSessionId)
let (toExists, toSession) = SessionManager.TryGetSession(toSessionId)
if (fromExists && toExists) then
match toSession.Callback with
| Some(callback) ->
callback.DeliverMessage message fromSessionId toSessionId
log (sprintf "Message '%s' send from '%s' to '%s'." message fromSession.UserName toSession.UserName) "Information"
| None -> ()
member this.UpdateClientList clientInfo =
if clientInfo.IsActive
then
let (created, session) = SessionManager.CreateOrUpdateSession clientInfo.SessionId clientInfo.UserName clientInfo.RoleId None
if created then log (sprintf "Remote session '%s' by user '%s' has been opened in role '%s'." session.SessionId session.UserName session.RoleId) "Information"
else
SessionManager.RemoveSession(clientInfo.SessionId)
log (sprintf "Remote session '%s' by user '%s' has been opened in role '%s'." clientInfo.SessionId clientInfo.UserName clientInfo.RoleId) "Information"
this.NotifyConnectedClients clientInfo
interface IChatService with
member this.Register userName =
let roleId = RoleEnvironment.CurrentRoleInstance.Id
let sessionId = OperationContext.Current.SessionId
let callback = OperationContext.Current.GetCallbackChannel<IClientNotification>()
let (existing, session) = SessionManager.CreateOrUpdateSession sessionId userName roleId (Some(callback))
if (not existing)
then
OperationContext.Current.Channel.Closed.Add(fun e ->
SessionManager.RemoveSession sessionId
this.NotifyConnectedClients session
log (sprintf "Session '%s' by user '%s' has been closed in role '%s'." sessionId userName roleId) "Information")
log (sprintf "Session '%s' by user '%s' has been opened in role '%s'." sessionId userName roleId) "Information"
this.NotifyConnectedClients session
ClientInformation(sessionId, userName, roleId)
member this.SendMessage message sessionId =
let fromSessionId = OperationContext.Current.SessionId
(this :> IClientNotification).DeliverMessage message fromSessionId sessionId
member this.GetConnectedClients() =
(SessionManager.GetActiveSessions()
|> Seq.map (fun session -> ClientInformation(session.SessionId, session.UserName, session.RoleId)))
type WorkerRole() =
inherit RoleEntryPoint()
let log message kind = Trace.WriteLine(message, kind)
let rec startChatService (retries:int) =
if (retries = 0)
then
RoleEnvironment.RequestRecycle();
else
log "Start chat service host..." "Information"
let serviceHost = new ServiceHost(typeof<ChatService>)
serviceHost.Faulted.Add(fun e ->
log (String.Format("Host fault occured. Aborting and restartign the host. Retry count: {0}", retries)) "Error"
serviceHost.Abort()
startChatService(retries - 1))
let binding = new NetTcpBinding(SecurityMode.None)
let externalEndPoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints.["ChatService"]
serviceHost.AddServiceEndpoint(typeof<IChatService>, binding, (String.Format("net.tcp://{0}/ChatService", externalEndPoint.IPEndpoint))) |> ignore
try
serviceHost.Open()
log "Chat service host started successfully." "Information"
with
| :? TimeoutException as timeoutException -> log (sprintf "The service operation timed out. %s" timeoutException.Message) "Error"
| :? CommunicationException as communicationException -> log (sprintf "The service operation timed out. %s" communicationException.Message) "Error"
override wr.Run() =
log "Worker Process entry point called" "Information"
startChatService 3
while(true) do
Thread.Sleep(300000)
log "Working" "Information"
override wr.OnStart() =
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit <- 12
DiagnosticMonitor.Start("DiagnosticsConnectionString") |> ignore
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
RoleEnvironment.Changing.Add(fun e ->
// If a configuration setting is changing
if e.Changes |> Seq.exists (fun change -> change :? RoleEnvironmentConfigurationSettingChange) then
// Set e.Cancel to true to restart this role instance
e.Cancel <- true)
base.OnStart()
The client:
module AzureClient.Client
open System
open System.Collections.Generic
open System.ServiceModel
open System.Runtime.Serialization
open System.Windows
open System.Threading
open AzureTalk.Contract
type IChatServiceChannel =
interface IChatService
interface IClientChannel
type ClientErrorEventArgs() =
inherit EventArgs()
let mutable message = ""
let mutable cancel = false
member this.Message with get() = message and set(x:string) = message <- x
member this.Cancel with get() = cancel and set(x:bool) = cancel <- cancel
type ChatClient(callbackObject:IClientNotification,endpoint:string) as this =
let binding = new NetTcpBinding(SecurityMode.None, false)
let factory = new DuplexChannelFactory<IChatServiceChannel>(callbackObject, binding, endpoint)
let error = new Event<ChatClient*ClientErrorEventArgs>()
let onError message = error.Trigger(this, new ClientErrorEventArgs(Message = message, Cancel = false))
let mutable _channel = None
member this.Error = error.Publish
member this.Channel
with get() =
let getChannel() =
let channel = factory.CreateChannel()
channel.Faulted.AddHandler(EventHandler(fun sender e ->
channel.Abort()
_channel <- None
System.Windows.MessageBox.Show("Channel Faulted") |> ignore
))
channel
let channel =
match _channel with
| None -> getChannel()
| Some(x:IChatServiceChannel) when x.State = CommunicationState.Faulted -> getChannel()
| Some(x:IChatServiceChannel) -> x
_channel <- Some(channel)
channel
member this.Close() =
try
match _channel with | None -> () | Some(x:IChatServiceChannel) -> x.Close()
with
| :? TimeoutException as e -> this.Channel.Abort()
| :? CommunicationException as e -> this.Channel.Abort()
interface IChatService with
member this.Register userName = this.Channel.Register(userName)
member this.SendMessage message toUserName = this.Channel.SendMessage message toUserName
member this.GetConnectedClients() = this.Channel.GetConnectedClients()
type Client() as this =
let mutable userList = Seq.empty<ClientInformation>
let endpoint = "net.tcp://localhost:3030/ChatService"
let chatClient = new ChatClient(this,endpoint)
let context = SynchronizationContext()
let sync(f:obj->unit) = context.Post(SendOrPostCallback(f), null)
let showMsg msg = System.Windows.MessageBox.Show(msg) |> ignore
let showErr err = System.Windows.MessageBox.Show(err) |> ignore
do
chatClient.Error.AddHandler(fun _ (_,ea) -> showMsg ea.Message)
interface IClientNotification with
member this.DeliverMessage message fromSessionId toSessionId =
sync(fun state ->
match (userList |> Seq.tryFind(fun userInfo -> userInfo.SessionId = fromSessionId)) with
| Some(fromSession) -> showMsg (sprintf @"Message: %s \n From: %s" message fromSession.UserName)
| None -> showErr (sprintf "A message was received from an unknown sender. The session ID '%s' is not registered." fromSessionId)
)
member this.UpdateClientList(session) =
sync(fun state ->
match (userList |> Seq.tryFind(fun userInfo -> userInfo.SessionId = session.SessionId)) with
| Some(client) ->
if session.IsActive
then
client.UserName <- session.UserName
client.RoleId <- session.RoleId
else
userList <- userList |> Seq.filter(fun u -> u = client)
showMsg (sprintf "User '%s' has left the chat." client.UserName)
| None ->
if session.IsActive
then
userList <- Seq.append ([session] |> List.toSeq) userList
showMsg (sprintf "User '%s' has joined the chat." session.UserName)
)
member this.Register userName = chatClient.Channel.Register(userName)
member this.SendMessage message toSessionId = chatClient.Channel.SendMessage message toSessionId
member this.GetConnectedClients() = chatClient.Channel.GetConnectedClients()
let client = new Client()
let clientInformation = client.Register "Matt"
client.GetConnectedClients() |> Seq.iter(fun c -> client.SendMessage "Hello from Matt" c.SessionId)
Console.Read() |> ignore
Hello,
Have you successfully run a WCF service with an http input endpoint in a worker role in the cloud? When I try this, I get System.ServiceModel.AddressAccessDeniedException: HTTP could not register URL http://+:20000/. Your process does not have access rights to this namespace (see http://go.microsoft.com/fwlink/?LinkId=70353 for details).
G’day Tony,
I have used http bindings in the cloud without any trouble. I’ve never seen these issues. :/
Matt