CorrelationService.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / Activities / LocalService / CorrelationService.cs / 1305376 / CorrelationService.cs

                            #region Using directives 

using System;
using System.Diagnostics;
using System.Collections.Generic; 
using System.Collections;
using System.Reflection; 
using System.Runtime.Serialization; 
using System.Workflow.ComponentModel;
using System.Workflow.Runtime; 
using System.Workflow.Runtime.Hosting;
using System.Runtime.Remoting.Messaging;
using System.Xml;
using System.Globalization; 

#endregion 
 
namespace System.Workflow.Activities
{ 
    internal interface ICorrelationProvider
    {
        ICollection ResolveCorrelationPropertyValues(Type interfaceType, string memberName, object[] methodArgs, bool provideInitializerTokens);
        bool IsInitializingMember(Type interfaceType, string memberName, object[] methodArgs); 
    }
 
    [AttributeUsageAttribute(AttributeTargets.Interface | AttributeTargets.Class, AllowMultiple = false, Inherited = true)] 
    internal sealed class CorrelationProviderAttribute : Attribute
    { 
        private Type correlationProviderType;

        internal CorrelationProviderAttribute(Type correlationProviderType)
        { 
            this.correlationProviderType = correlationProviderType;
        } 
 
        internal Type CorrelationProviderType
        { 
            get
            {
                return this.correlationProviderType;
            } 
        }
    } 
 
    internal static class CorrelationService
    { 
        internal static void Initialize(IServiceProvider context, Activity activity, Type interfaceType, string methodName, Guid instanceId)
        {
            if (activity == null)
                throw new ArgumentNullException("activity"); 
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType"); 
            if (methodName == null) 
                throw new ArgumentNullException("methodName");
 
            Subscribe(context,activity, interfaceType, methodName, null, instanceId);
            InitializeFollowers(context,interfaceType, methodName);
        }
 
        internal static bool Subscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener eventListener, Guid instanceId)
        { 
            if (activity == null) 
                throw new ArgumentNullException("activity");
            if (interfaceType == null) 
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");
 
            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService));
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName); 
            if (queueName != null) 
            {
                // initializer 
                WorkflowQueue queue = null;
                if (queueService.Exists(queueName))
                {
                    queue = queueService.GetWorkflowQueue(queueName); 
                    queue.Enabled = true;
                } 
                else 
                {
                    queue = queueService.CreateWorkflowQueue(queueName, true); 
                }

                if (eventListener != null)
                { 
                    queue.RegisterForQueueItemAvailable(eventListener, activity.QualifiedName);
                    WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "CorrelationService: activity '{0}' subscribing to QueueItemAvailable", activity.QualifiedName); 
                    return true; 
                }
                return false; 
            }

            SubscribeForCorrelationTokenInvalidation(activity, interfaceType, methodName, eventListener, instanceId);
            return false; 
        }
 
        internal static bool Unsubscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener eventListener) 
        {
            if (activity == null) 
                throw new ArgumentException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null) 
                throw new ArgumentNullException("methodName");
 
            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService)); 
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName);
            if (queueName != null) 
            {
                if (queueService.Exists(queueName))
                {
                    queueService.GetWorkflowQueue(queueName).UnregisterForQueueItemAvailable(eventListener); 
                    return true;
                } 
            } 
            return false;
        } 

        internal static IComparable ResolveQueueName(Activity activity, Type interfaceType, string methodName)
        {
            if (activity == null) 
                throw new ArgumentNullException("activity");
            if (interfaceType == null) 
                throw new ArgumentNullException("interfaceType"); 
            if (methodName == null)
                throw new ArgumentNullException("methodName"); 

            // resolver will check for an explicit correlation provider,
            // if none present this will return an uncorrelated provider.
            // note, an uncorrelated methodName will always be an initializer 
            if (CorrelationResolver.IsInitializingMember(interfaceType, methodName, null))
            { 
                ICollection corrvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, null, true); 
                return new EventQueueName(interfaceType, methodName, corrvalues);
            } 

            CorrelationToken reference = GetCorrelationToken(activity);
            if (!reference.Initialized)
                return null; 

            return new EventQueueName(interfaceType, methodName, reference.Properties); 
        } 

        internal static void InvalidateCorrelationToken(Activity activity, Type interfaceType, string methodName, object[] messageArgs) 
        {
            object correlationProvider = CorrelationResolver.GetCorrelationProvider(interfaceType);
            if (correlationProvider is NonCorrelatedProvider)
                return; 

            CorrelationToken reference = GetCorrelationToken(activity); 
            ICollection correlationvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, messageArgs, false); 

            if (!CorrelationResolver.IsInitializingMember(interfaceType, methodName, messageArgs)) 
            {
                if (!reference.Initialized)
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationNotInitialized, reference.Name, activity.QualifiedName));
                ValidateCorrelation(reference.Properties, correlationvalues, reference.Name,activity); 
                return;
            } 
 
            // invalidate correlation token if methodName is an initializer
            reference.Initialize(activity, correlationvalues); 
        }

        private static CorrelationToken GetCorrelationToken(Activity activity)
        { 
            DependencyProperty dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType());
            if (dependencyProperty == null) 
                dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType().BaseType); 
            CorrelationToken reference = activity.GetValue(dependencyProperty) as CorrelationToken;
            if (reference == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));

            CorrelationToken correlator = CorrelationTokenCollection.GetCorrelationToken(activity, reference.Name, reference.OwnerActivityName);
            if (correlator == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));
 
            return correlator; 
        }
 
        private static void ValidateCorrelation(ICollection initializerProperties, ICollection followerProperties, string memberName, Activity activity)
        {
            if (followerProperties == null && initializerProperties == null)
                return; 

            if (followerProperties == null || initializerProperties == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName,activity.QualifiedName)); 

            if (initializerProperties.Count != followerProperties.Count) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));

            IEnumerator initializerValues = initializerProperties.GetEnumerator();
            IEnumerator followerValues  = followerProperties.GetEnumerator(); 
            while (initializerValues.MoveNext() && followerValues.MoveNext())
            { 
                IComparable initializerValue = initializerValues.Current.Value as IComparable; 
                object followerValue = followerValues.Current.Value;
 
                if (initializerValue != null && (initializerValue.CompareTo(followerValue) != 0))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));
                else if (initializerValues.Current.Value == null && followerValue == null)
                    return; 
                else if (initializerValue == null && followerValue != null && !followerValue.Equals(initializerValues.Current.Value))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName)); 
            } 
        }
 
        private static void SubscribeForCorrelationTokenInvalidation(Activity activity, Type interfaceType, string followermethodName, IActivityEventListener eventListener, Guid instanceId)
        {
            CorrelationToken reference = GetCorrelationToken(activity);
            CorrelationTokenInvalidatedHandler dataChangeEventListener = new CorrelationTokenInvalidatedHandler(interfaceType, followermethodName, eventListener, instanceId); 
            reference.SubscribeForCorrelationTokenInitializedEvent(activity, dataChangeEventListener);
        } 
 
        private static void InitializeFollowers(IServiceProvider context, Type interfaceType, string followermethodName)
        { 
            if (CorrelationResolver.IsInitializingMember(interfaceType, followermethodName, null))
                return;

            EventInfo[] events = interfaceType.GetEvents(); 
            foreach (EventInfo e in events)
            { 
                CreateFollowerEntry(context, interfaceType, followermethodName, e.Name); 
            }
        } 

        private static void CreateFollowerEntry(IServiceProvider context, Type interfaceType, string followermethodName, string initializermethodName)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializermethodName, null)) 
                return;
 
            WorkflowQueuingService queueSvcs = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService)); 
            FollowerQueueCreator follower = new FollowerQueueCreator(followermethodName);
            WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "Creating follower {0} on initializer {1}", interfaceType.Name + followermethodName, interfaceType.Name + initializermethodName); 

            ICollection corrValues = CorrelationResolver.ResolveCorrelationValues(interfaceType, initializermethodName, null, true);
            EventQueueName key = new EventQueueName(interfaceType, initializermethodName, corrValues);
            WorkflowQueue initializerQueue = null; 
            if (queueSvcs.Exists(key))
            { 
                initializerQueue = queueSvcs.GetWorkflowQueue(key); 
            }
            else 
            {
                // traversed follower before initializer
                initializerQueue = queueSvcs.CreateWorkflowQueue(key, true);
                initializerQueue.Enabled = false; 
            }
 
            initializerQueue.RegisterForQueueItemArrived(follower); 
        }
 
        internal static void UninitializeFollowers(Type interfaceType, string initializer, WorkflowQueue initializerQueue)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializer, null))
                return; 

            EventInfo[] events = interfaceType.GetEvents(); 
            foreach (EventInfo e in events) 
            {
                string follower = e.Name; 
                if (!CorrelationResolver.IsInitializingMember(interfaceType, e.Name, null))
                    initializerQueue.UnregisterForQueueItemArrived(new FollowerQueueCreator(follower));
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
#region Using directives 

using System;
using System.Diagnostics;
using System.Collections.Generic; 
using System.Collections;
using System.Reflection; 
using System.Runtime.Serialization; 
using System.Workflow.ComponentModel;
using System.Workflow.Runtime; 
using System.Workflow.Runtime.Hosting;
using System.Runtime.Remoting.Messaging;
using System.Xml;
using System.Globalization; 

#endregion 
 
namespace System.Workflow.Activities
{ 
    internal interface ICorrelationProvider
    {
        ICollection ResolveCorrelationPropertyValues(Type interfaceType, string memberName, object[] methodArgs, bool provideInitializerTokens);
        bool IsInitializingMember(Type interfaceType, string memberName, object[] methodArgs); 
    }
 
    [AttributeUsageAttribute(AttributeTargets.Interface | AttributeTargets.Class, AllowMultiple = false, Inherited = true)] 
    internal sealed class CorrelationProviderAttribute : Attribute
    { 
        private Type correlationProviderType;

        internal CorrelationProviderAttribute(Type correlationProviderType)
        { 
            this.correlationProviderType = correlationProviderType;
        } 
 
        internal Type CorrelationProviderType
        { 
            get
            {
                return this.correlationProviderType;
            } 
        }
    } 
 
    internal static class CorrelationService
    { 
        internal static void Initialize(IServiceProvider context, Activity activity, Type interfaceType, string methodName, Guid instanceId)
        {
            if (activity == null)
                throw new ArgumentNullException("activity"); 
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType"); 
            if (methodName == null) 
                throw new ArgumentNullException("methodName");
 
            Subscribe(context,activity, interfaceType, methodName, null, instanceId);
            InitializeFollowers(context,interfaceType, methodName);
        }
 
        internal static bool Subscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener eventListener, Guid instanceId)
        { 
            if (activity == null) 
                throw new ArgumentNullException("activity");
            if (interfaceType == null) 
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");
 
            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService));
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName); 
            if (queueName != null) 
            {
                // initializer 
                WorkflowQueue queue = null;
                if (queueService.Exists(queueName))
                {
                    queue = queueService.GetWorkflowQueue(queueName); 
                    queue.Enabled = true;
                } 
                else 
                {
                    queue = queueService.CreateWorkflowQueue(queueName, true); 
                }

                if (eventListener != null)
                { 
                    queue.RegisterForQueueItemAvailable(eventListener, activity.QualifiedName);
                    WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "CorrelationService: activity '{0}' subscribing to QueueItemAvailable", activity.QualifiedName); 
                    return true; 
                }
                return false; 
            }

            SubscribeForCorrelationTokenInvalidation(activity, interfaceType, methodName, eventListener, instanceId);
            return false; 
        }
 
        internal static bool Unsubscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener eventListener) 
        {
            if (activity == null) 
                throw new ArgumentException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null) 
                throw new ArgumentNullException("methodName");
 
            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService)); 
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName);
            if (queueName != null) 
            {
                if (queueService.Exists(queueName))
                {
                    queueService.GetWorkflowQueue(queueName).UnregisterForQueueItemAvailable(eventListener); 
                    return true;
                } 
            } 
            return false;
        } 

        internal static IComparable ResolveQueueName(Activity activity, Type interfaceType, string methodName)
        {
            if (activity == null) 
                throw new ArgumentNullException("activity");
            if (interfaceType == null) 
                throw new ArgumentNullException("interfaceType"); 
            if (methodName == null)
                throw new ArgumentNullException("methodName"); 

            // resolver will check for an explicit correlation provider,
            // if none present this will return an uncorrelated provider.
            // note, an uncorrelated methodName will always be an initializer 
            if (CorrelationResolver.IsInitializingMember(interfaceType, methodName, null))
            { 
                ICollection corrvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, null, true); 
                return new EventQueueName(interfaceType, methodName, corrvalues);
            } 

            CorrelationToken reference = GetCorrelationToken(activity);
            if (!reference.Initialized)
                return null; 

            return new EventQueueName(interfaceType, methodName, reference.Properties); 
        } 

        internal static void InvalidateCorrelationToken(Activity activity, Type interfaceType, string methodName, object[] messageArgs) 
        {
            object correlationProvider = CorrelationResolver.GetCorrelationProvider(interfaceType);
            if (correlationProvider is NonCorrelatedProvider)
                return; 

            CorrelationToken reference = GetCorrelationToken(activity); 
            ICollection correlationvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, messageArgs, false); 

            if (!CorrelationResolver.IsInitializingMember(interfaceType, methodName, messageArgs)) 
            {
                if (!reference.Initialized)
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationNotInitialized, reference.Name, activity.QualifiedName));
                ValidateCorrelation(reference.Properties, correlationvalues, reference.Name,activity); 
                return;
            } 
 
            // invalidate correlation token if methodName is an initializer
            reference.Initialize(activity, correlationvalues); 
        }

        private static CorrelationToken GetCorrelationToken(Activity activity)
        { 
            DependencyProperty dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType());
            if (dependencyProperty == null) 
                dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType().BaseType); 
            CorrelationToken reference = activity.GetValue(dependencyProperty) as CorrelationToken;
            if (reference == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));

            CorrelationToken correlator = CorrelationTokenCollection.GetCorrelationToken(activity, reference.Name, reference.OwnerActivityName);
            if (correlator == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));
 
            return correlator; 
        }
 
        private static void ValidateCorrelation(ICollection initializerProperties, ICollection followerProperties, string memberName, Activity activity)
        {
            if (followerProperties == null && initializerProperties == null)
                return; 

            if (followerProperties == null || initializerProperties == null) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName,activity.QualifiedName)); 

            if (initializerProperties.Count != followerProperties.Count) 
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));

            IEnumerator initializerValues = initializerProperties.GetEnumerator();
            IEnumerator followerValues  = followerProperties.GetEnumerator(); 
            while (initializerValues.MoveNext() && followerValues.MoveNext())
            { 
                IComparable initializerValue = initializerValues.Current.Value as IComparable; 
                object followerValue = followerValues.Current.Value;
 
                if (initializerValue != null && (initializerValue.CompareTo(followerValue) != 0))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));
                else if (initializerValues.Current.Value == null && followerValue == null)
                    return; 
                else if (initializerValue == null && followerValue != null && !followerValue.Equals(initializerValues.Current.Value))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName)); 
            } 
        }
 
        private static void SubscribeForCorrelationTokenInvalidation(Activity activity, Type interfaceType, string followermethodName, IActivityEventListener eventListener, Guid instanceId)
        {
            CorrelationToken reference = GetCorrelationToken(activity);
            CorrelationTokenInvalidatedHandler dataChangeEventListener = new CorrelationTokenInvalidatedHandler(interfaceType, followermethodName, eventListener, instanceId); 
            reference.SubscribeForCorrelationTokenInitializedEvent(activity, dataChangeEventListener);
        } 
 
        private static void InitializeFollowers(IServiceProvider context, Type interfaceType, string followermethodName)
        { 
            if (CorrelationResolver.IsInitializingMember(interfaceType, followermethodName, null))
                return;

            EventInfo[] events = interfaceType.GetEvents(); 
            foreach (EventInfo e in events)
            { 
                CreateFollowerEntry(context, interfaceType, followermethodName, e.Name); 
            }
        } 

        private static void CreateFollowerEntry(IServiceProvider context, Type interfaceType, string followermethodName, string initializermethodName)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializermethodName, null)) 
                return;
 
            WorkflowQueuingService queueSvcs = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService)); 
            FollowerQueueCreator follower = new FollowerQueueCreator(followermethodName);
            WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "Creating follower {0} on initializer {1}", interfaceType.Name + followermethodName, interfaceType.Name + initializermethodName); 

            ICollection corrValues = CorrelationResolver.ResolveCorrelationValues(interfaceType, initializermethodName, null, true);
            EventQueueName key = new EventQueueName(interfaceType, initializermethodName, corrValues);
            WorkflowQueue initializerQueue = null; 
            if (queueSvcs.Exists(key))
            { 
                initializerQueue = queueSvcs.GetWorkflowQueue(key); 
            }
            else 
            {
                // traversed follower before initializer
                initializerQueue = queueSvcs.CreateWorkflowQueue(key, true);
                initializerQueue.Enabled = false; 
            }
 
            initializerQueue.RegisterForQueueItemArrived(follower); 
        }
 
        internal static void UninitializeFollowers(Type interfaceType, string initializer, WorkflowQueue initializerQueue)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializer, null))
                return; 

            EventInfo[] events = interfaceType.GetEvents(); 
            foreach (EventInfo e in events) 
            {
                string follower = e.Name; 
                if (!CorrelationResolver.IsInitializingMember(interfaceType, e.Name, null))
                    initializerQueue.UnregisterForQueueItemArrived(new FollowerQueueCreator(follower));
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK