Nov 232009
Quick test of the Azure WCF service

Quick test of the Azure WCF service

Azure now has support for input endpoints on Worker Roles! (Azure Tools Nov 2009) This is awesome as it greatly simplifies building WCF services in the cloud. David Aiken has written an excellent Channel 9 post on exactly how to do this (here).

As per usual, I’ve gone ahead and ported the most important components to F# and I have included a short list of some of the difficulties I had while doing so.

Lessons Learned;

1. In Azure, you cannot just specify the input endpoints in code, you have to specify the endpoints in the properties for the role. It will work in the Dev fabric but will not initialize in the cloud. (thanks to Daniel C Wang)

Wrong:

 let baseAddress = Uri("http://kapow.cloudapp.net:8080/service")
 let myServiceHost = new ServiceHost((typeof<GreetingService>), [|baseAddress|])

Right:

using Microsoft.WindowsAzure.SerivceRuntime;

var listenAddress = RoleEnvironment
    .CurrentInstance
    .InstanceEndpoint["<name of your input endpoint >"]
    .IPEndpoint;

string uri = String.Format("http://{0}" ,listenAddress)

2. TryGetValue can return the value in a 2-tuple instead of by ref – this makes it a lot more F# friendly. (from here)

This:

SessionInformation session;
bool exists = sessions.TryGetValue(sessionId, out session)

Becomes this:

let (exists,session) = sessions.TryGetValue(sessionId)

3. KnownTypes – By default, you cannot use a subclass of a data contract class instead of its base class. This took me a while to figure out. (From Programming WCF Services – O’Reilly)

In C#:

[DataContract]
[KnownType(typeof(Custoemr))]
class Contact
{..}

[DataContract]
Class Customer : Contact
{..}

4. You have to take down an Azure deployment in order to change the input endpoints, so no in place upgrades with new endpoints. The Dr. Watson error for this wasn’t very clear.

Things I still can’t figure out;

I am assuming that the ICommunicationObject interface is added by an attribute, I just can’t cast to it. I get an invalid cast error at runtime. I need to do this, so if anyone knows how, please let me know.

In C#:

((ICommunicationObject)client).Abort();

In F#: (from here)

(box client :?> ICommunicationObject).Abort()

The Code:

The contract:

namespace AzureTalk.Contract

open System
open System.Collections.Generic
open System.ServiceModel
open System.Runtime.Serialization

[<DataContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10:schemas")>]
[<KnownType(typeof<SessionInformation>)>]
type ClientInformation =
        [<DataMember>]
        val mutable SessionId : string

        [<DataMember>]
        val mutable UserName : string

        [<DataMember>]
        val mutable RoleId : string

        [<DataMember>]
        val mutable IsActive : bool

        new(sessionId, userName, roleId) =
            {
                SessionId = sessionId
                UserName = userName
                RoleId = roleId
                IsActive = true
            }

and [<DataContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10")>]
    SessionInformation =
    inherit ClientInformation

        [<IgnoreDataMember>]
        val mutable Callback : IClientNotification option

    new(sessionId, userName, roleId, isActive, callback) =
            {   inherit ClientInformation(sessionId, userName, roleId)
                Callback = callback
            }

and [<ServiceContract(Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10" )>]
     IClientNotification =

    [<OperationContract(IsOneWay = true)>]
    abstract DeliverMessage : message:string -> fromSessionId:string -> toSessionId:string -> Unit

    [<OperationContract(IsOneWay = true)>]
    abstract UpdateClientList : clientInfo:ClientInformation -> Unit

[<ServiceContract(
    Namespace = "urn:WindowsAzurePlatformKit:Labs:AzureTalk:2009:10",
    CallbackContract = typeof<IClientNotification>,
    SessionMode = SessionMode.Required)>]
type IChatService =

    [<OperationContract(IsInitiating = true)>]
    abstract Register : userName:string -> ClientInformation

    [<OperationContract(IsInitiating = false)>]
    abstract SendMessage : message:string -> sessionId:string -> Unit

    [<OperationContract(IsInitiating = false)>]
    abstract GetConnectedClients : Unit -> IEnumerable<ClientInformation>

The server:

namespace WorkerRole1

open System
open System.Diagnostics
open System.Linq;
open System.Net;
open System.Threading
open Microsoft.WindowsAzure.Diagnostics
open Microsoft.WindowsAzure.ServiceRuntime
open System.Runtime.Serialization
open System.Collections.Generic
open System.ServiceModel
open AzureTalk.Contract

type internal SessionManager() =
    static let sessionLock = new ReaderWriterLockSlim()
    static let read (f:Unit->'a) =
        sessionLock.EnterReadLock()
        try
            f()
        finally
            sessionLock.ExitReadLock()
    static let write (f:Unit->'a) =
        sessionLock.EnterWriteLock()
        try
            f()
        finally
            sessionLock.ExitWriteLock()
    static let sessions = new Dictionary<string, SessionInformation>()

    static member TryGetSession(sessionId:string) =  read (fun() -> sessions.TryGetValue(sessionId))

    static member CreateOrUpdateSession (sessionId:string) (userName:string) (roleId:string) (callback:IClientNotification option) =
        write (fun() ->

                let (existingSession, session) = sessions.TryGetValue(sessionId)
                if (not existingSession) then
                    let session = SessionInformation(sessionId, userName, roleId, true, callback)
                    sessions.Add(sessionId, session)
                    (existingSession, session)
                else
                    session.SessionId <- sessionId
                    session.UserName  <- userName
                    session.RoleId    <- roleId
                    session.Callback  <- callback
                    session.IsActive  <- true
                    (existingSession, session))

    static member RemoveSession (sessionId:string) =
        write (fun() ->
                    let (exists,session) = sessions.TryGetValue(sessionId)
                    if (exists) then session.IsActive <- false; sessions.Remove(sessionId) |> ignore)

    static member GetActiveSessions() = (read (fun() -> sessions.Values |> Seq.filter (fun s -> s.IsActive))).ToArray()

///Implementation of the WCF chat service
[<ServiceBehavior(
    InstanceContextMode = InstanceContextMode.Single,
    ConcurrencyMode = ConcurrencyMode.Multiple,
#if DEBUG
    IncludeExceptionDetailInFaults = true,
#else
    IncludeExceptionDetailInFaults = false,
#endif
    AddressFilterMode = AddressFilterMode.Any)>]
type ChatService() =
    let log message kind = Trace.WriteLine(message, kind)

    member this.NotifyConnectedClients (clientInfo:ClientInformation) =
        SessionManager.GetActiveSessions()
        |> Array.iter (fun client ->
            try
                match client.Callback with
                | Some(callback)-> callback.UpdateClientList(clientInfo)
                | None -> ()

            with
            | :? TimeoutException as e ->
                log (sprintf "Unable to notify client '%s'. The service operation timed out. '%s'" client.UserName e.Message) "Error"
                //(box client :?> ICommunicationObject).Abort()
                client.Callback <- None

            | :? CommunicationException as e ->
                log (sprintf "Unable to notify client '%s'. There was a communication problem. '%s' - '%s'" client.UserName e.Message e.StackTrace) "Error"
                //(box client :?> ICommunicationObject).Abort()
                client.Callback <- None)

    interface IClientNotification with

        member this.DeliverMessage message fromSessionId toSessionId =
            let (fromExists,fromSession)= SessionManager.TryGetSession(fromSessionId)
            let (toExists, toSession) = SessionManager.TryGetSession(toSessionId)
            if (fromExists && toExists) then
                match toSession.Callback with
                | Some(callback) ->
                    callback.DeliverMessage message fromSessionId toSessionId
                    log (sprintf "Message '%s' send from '%s' to '%s'." message fromSession.UserName toSession.UserName) "Information"
                |  None -> ()

        member this.UpdateClientList clientInfo =
            if clientInfo.IsActive
            then
                let (created, session) = SessionManager.CreateOrUpdateSession clientInfo.SessionId clientInfo.UserName clientInfo.RoleId None
                if created then log (sprintf "Remote session '%s' by user '%s' has been opened in role '%s'." session.SessionId session.UserName session.RoleId) "Information"
            else
                SessionManager.RemoveSession(clientInfo.SessionId)
                log (sprintf "Remote session '%s' by user '%s' has been opened in role '%s'." clientInfo.SessionId clientInfo.UserName clientInfo.RoleId) "Information"
            this.NotifyConnectedClients clientInfo

    interface IChatService with
        member this.Register userName =
            let roleId = RoleEnvironment.CurrentRoleInstance.Id
            let sessionId = OperationContext.Current.SessionId
            let callback = OperationContext.Current.GetCallbackChannel<IClientNotification>()

            let (existing, session) = SessionManager.CreateOrUpdateSession sessionId userName roleId (Some(callback))
            if (not existing)
                then
                    OperationContext.Current.Channel.Closed.Add(fun e ->
                        SessionManager.RemoveSession sessionId
                        this.NotifyConnectedClients session
                        log  (sprintf "Session '%s' by user '%s' has been closed in role '%s'." sessionId userName roleId)  "Information")
                    log (sprintf "Session '%s' by user '%s' has been opened in role '%s'." sessionId userName roleId) "Information"

            this.NotifyConnectedClients session
            ClientInformation(sessionId, userName, roleId)

        member this.SendMessage message sessionId =
            let fromSessionId = OperationContext.Current.SessionId
            (this :> IClientNotification).DeliverMessage message fromSessionId sessionId

        member this.GetConnectedClients() =
            (SessionManager.GetActiveSessions()
            |> Seq.map (fun session -> ClientInformation(session.SessionId, session.UserName, session.RoleId)))

type WorkerRole() =
    inherit RoleEntryPoint()

    let log message kind = Trace.WriteLine(message, kind)

    let rec startChatService (retries:int) =
        if (retries = 0)
            then
                RoleEnvironment.RequestRecycle();
            else
                log "Start chat service host..." "Information"

                let serviceHost = new ServiceHost(typeof<ChatService>)

                serviceHost.Faulted.Add(fun e ->
                    log (String.Format("Host fault occured. Aborting and restartign the host. Retry count: {0}", retries)) "Error"
                    serviceHost.Abort()
                    startChatService(retries - 1))

                let binding = new NetTcpBinding(SecurityMode.None)

                let externalEndPoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints.["ChatService"]

                serviceHost.AddServiceEndpoint(typeof<IChatService>, binding, (String.Format("net.tcp://{0}/ChatService", externalEndPoint.IPEndpoint))) |> ignore

                try
                    serviceHost.Open()
                    log "Chat service host started successfully." "Information"
                with
                | :? TimeoutException as timeoutException -> log (sprintf "The service operation timed out. %s" timeoutException.Message) "Error"
                | :? CommunicationException as communicationException -> log (sprintf "The service operation timed out.  %s" communicationException.Message) "Error"

    override wr.Run() =

        log "Worker Process entry point called" "Information"

        startChatService 3

        while(true) do
            Thread.Sleep(300000)
            log "Working" "Information"

    override wr.OnStart() =

        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit <- 12

        DiagnosticMonitor.Start("DiagnosticsConnectionString") |> ignore

        // For information on handling configuration changes
        // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
        RoleEnvironment.Changing.Add(fun e ->
            // If a configuration setting is changing
            if e.Changes |> Seq.exists (fun change -> change :? RoleEnvironmentConfigurationSettingChange) then
                // Set e.Cancel to true to restart this role instance
                e.Cancel <- true)

        base.OnStart()

The client:

module AzureClient.Client

open System
open System.Collections.Generic
open System.ServiceModel
open System.Runtime.Serialization
open System.Windows
open System.Threading
open AzureTalk.Contract

type IChatServiceChannel =
    interface IChatService
    interface IClientChannel

type ClientErrorEventArgs() =
    inherit EventArgs()

    let mutable message = ""
    let mutable cancel = false

    member this.Message with get() = message and set(x:string) = message <- x
    member this.Cancel with get() = cancel and set(x:bool) = cancel <- cancel

type ChatClient(callbackObject:IClientNotification,endpoint:string) as this =
    let binding = new NetTcpBinding(SecurityMode.None, false)
    let factory = new DuplexChannelFactory<IChatServiceChannel>(callbackObject, binding, endpoint)
    let error = new Event<ChatClient*ClientErrorEventArgs>()
    let onError message = error.Trigger(this, new ClientErrorEventArgs(Message = message, Cancel = false))

    let mutable _channel = None

    member this.Error = error.Publish

    member this.Channel
        with get() =
            let getChannel() =
                let channel = factory.CreateChannel()
                channel.Faulted.AddHandler(EventHandler(fun sender e ->
                    channel.Abort()
                    _channel <- None
                    System.Windows.MessageBox.Show("Channel Faulted") |> ignore
                ))
                channel
            let channel =
                match _channel with
                | None -> getChannel()
                | Some(x:IChatServiceChannel) when x.State = CommunicationState.Faulted -> getChannel()
                | Some(x:IChatServiceChannel) -> x
            _channel <- Some(channel)
            channel

    member this.Close() =
        try
            match _channel with | None -> () | Some(x:IChatServiceChannel) -> x.Close()
        with
        | :? TimeoutException as e -> this.Channel.Abort()
        | :? CommunicationException as e -> this.Channel.Abort()

    interface IChatService with
        member this.Register userName = this.Channel.Register(userName)
        member this.SendMessage message toUserName = this.Channel.SendMessage message toUserName
        member this.GetConnectedClients() = this.Channel.GetConnectedClients()

type Client() as this =
    let mutable userList = Seq.empty<ClientInformation>
    let endpoint = "net.tcp://localhost:3030/ChatService"
    let chatClient = new ChatClient(this,endpoint)
    let context = SynchronizationContext()
    let sync(f:obj->unit) = context.Post(SendOrPostCallback(f), null)
    let showMsg msg = System.Windows.MessageBox.Show(msg) |> ignore
    let showErr err = System.Windows.MessageBox.Show(err) |> ignore
    do
        chatClient.Error.AddHandler(fun _ (_,ea) ->  showMsg ea.Message)

    interface IClientNotification with
        member this.DeliverMessage message fromSessionId toSessionId =
            sync(fun state ->
                match (userList |> Seq.tryFind(fun userInfo -> userInfo.SessionId = fromSessionId)) with
                | Some(fromSession) -> showMsg (sprintf @"Message: %s \n From: %s" message fromSession.UserName)
                | None -> showErr (sprintf "A message was received from an unknown sender. The session ID '%s' is not registered." fromSessionId)
                )

        member this.UpdateClientList(session) =
            sync(fun state ->
                match (userList |> Seq.tryFind(fun userInfo -> userInfo.SessionId = session.SessionId)) with
                | Some(client) ->

                    if session.IsActive
                    then
                        client.UserName <- session.UserName
                        client.RoleId <- session.RoleId
                    else
                        userList <- userList |>  Seq.filter(fun u -> u = client)
                        showMsg (sprintf "User '%s' has left the chat." client.UserName)
                | None ->
                    if session.IsActive
                    then
                        userList <- Seq.append ([session] |> List.toSeq) userList
                        showMsg (sprintf "User '%s' has joined the chat." session.UserName)
                )
    member this.Register userName = chatClient.Channel.Register(userName)

    member this.SendMessage message toSessionId = chatClient.Channel.SendMessage message toSessionId

    member this.GetConnectedClients() = chatClient.Channel.GetConnectedClients()

let client = new Client()
let clientInformation = client.Register "Matt"

client.GetConnectedClients() |> Seq.iter(fun c -> client.SendMessage "Hello from Matt" c.SessionId)

Console.Read() |> ignore
Share and Enjoy:
  • Print
  • Digg
  • del.icio.us
  • Facebook
  • Google Bookmarks
  • RSS
  • StumbleUpon
  • Twitter
Posted by Matt

2 Comments to “Azure F# Worker Role with WCF”

  1. Tony says:

    Hello,

    Have you successfully run a WCF service with an http input endpoint in a worker role in the cloud? When I try this, I get System.ServiceModel.AddressAccessDeniedException: HTTP could not register URL http://+:20000/. Your process does not have access rights to this namespace (see http://go.microsoft.com/fwlink/?LinkId=70353 for details).

  2. Matt says:

    G’day Tony,

    I have used http bindings in the cloud without any trouble. I’ve never seen these issues. :/

    Matt

Leave a Reply

(required)

(required)