Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Activities / System / ServiceModel / Activities / Dispatcher / PersistenceProviderDirectory.cs / 1602392 / PersistenceProviderDirectory.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Activities.Dispatcher { using System.Activities; using System.Activities.DurableInstancing; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Runtime; using System.Runtime.DurableInstancing; using System.ServiceModel.Activities.Description; using System.Threading; using System.Transactions; using System.Xml.Linq; using System.Activities.Hosting; sealed class PersistenceProviderDirectory { readonly Activity workflowDefinition; readonly WorkflowServiceHost serviceHost; readonly InstanceStore store; readonly InstanceOwner owner; [Fx.Tag.SynchronizationObject(Blocking = false)] readonly DictionarykeyMap; readonly InstanceThrottle throttle; [Fx.Tag.Cache(typeof(PersistenceContext), Fx.Tag.CacheAttrition.ElementOnCallback, SizeLimit = "MaxConcurrentInstances")] Dictionary instanceCache; HashSet pipelinesInUse; bool aborted; // The PPD is not responsible for opening the Store, but it will abort it and close it. internal PersistenceProviderDirectory(InstanceStore store, InstanceOwner owner, IDictionary instanceMetadataChanges, Activity workflowDefinition, WorkflowServiceHost serviceHost, DurableConsistencyScope consistencyScope, int maxInstances) : this(workflowDefinition, serviceHost, consistencyScope, maxInstances) { Fx.Assert(store != null, "InstanceStore must be specified on PPD."); Fx.Assert(owner != null, "InstanceOwner must be specified on PPD."); this.store = store; this.owner = owner; this.InstanceMetadataChanges = instanceMetadataChanges; } internal PersistenceProviderDirectory(Activity workflowDefinition, WorkflowServiceHost serviceHost, int maxInstances) : this(workflowDefinition, serviceHost, DurableConsistencyScope.Local, maxInstances) { } PersistenceProviderDirectory(Activity workflowDefinition, WorkflowServiceHost serviceHost, DurableConsistencyScope consistencyScope, int maxInstances) { Fx.Assert(workflowDefinition != null, "Root Activity must be specified on PPD."); Fx.Assert(serviceHost != null, "WorkflowServiceHost must be specified on PPD."); Fx.AssertAndThrow(maxInstances > 0, "MaxInstance must be greater than zero on PPD."); this.workflowDefinition = workflowDefinition; this.serviceHost = serviceHost; ConsistencyScope = consistencyScope; MaxInstances = maxInstances; this.throttle = new InstanceThrottle(MaxInstances); this.pipelinesInUse = new HashSet (); this.keyMap = new Dictionary (); this.instanceCache = new Dictionary (); } public IDictionary InstanceMetadataChanges { get; private set; } public DurableConsistencyScope ConsistencyScope { get; private set; } public int MaxInstances { get; private set; } object ThisLock { get { return this.keyMap; } } public WorkflowServiceInstance InitializeInstance(Guid instanceId, PersistenceContext context, IDictionary instance, WorkflowCreationContext creationContext) { return WorkflowServiceInstance.InitializeInstance(context, instanceId, this.workflowDefinition, instance, creationContext, WorkflowSynchronizationContext.Instance, this.serviceHost); } // This should be called as part of the closing path. The caller should guarantee that // no LoadOrCreates are in progress or will be initialized after this, same with // AddAssociations or AddInstance. [Fx.Tag.Throws(typeof(OperationCanceledException), "The directory of loaded instances has been aborted. An abrupt shutdown of the service is in progress.")] public IEnumerable GetContexts() { lock (ThisLock) { ThrowIfClosedOrAborted(); // The ToList is for snapshotting within the lock. return this.instanceCache.Values.ToList(); } } // All PersistenceContexts are opened before they are returned. [Fx.Tag.InheritThrows(From = "EndLoad")] public IAsyncResult BeginLoad(InstanceKey key, ICollection associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) { if (key == null) { throw FxTrace.Exception.ArgumentNull("key"); } if (key.Value == Guid.Empty) { throw FxTrace.Exception.Argument("key", SR.InvalidKey); } return new LoadOrCreateAsyncResult(this, key, Guid.Empty, false, associatedKeys, transaction, false, timeout, callback, state); } [Fx.Tag.InheritThrows(From = "EndLoad")] public IAsyncResult BeginLoad(Guid instanceId, ICollection associatedKeys, Transaction transaction, bool loadAny, TimeSpan timeout, AsyncCallback callback, object state) { if (instanceId == Guid.Empty && !loadAny) { throw FxTrace.Exception.Argument("instanceId", SR.InvalidInstanceId); } Fx.Assert(!loadAny || instanceId == Guid.Empty, "instanceId must be Empty for loadAny!"); return new LoadOrCreateAsyncResult(this, null, instanceId, false, associatedKeys, transaction, loadAny, timeout, callback, state); } [Fx.Tag.Throws.Timeout("Instance may have been locked, keys may have been associated. (?!?)")] [Fx.Tag.Throws(typeof(InstancePersistenceException), "Instance wasn't locked, keys weren't associated.")] [Fx.Tag.Throws(typeof(CommunicationObjectAbortedException), "Instance store aborted")] [Fx.Tag.Throws(typeof(CommunicationObjectFaultedException), "Instance store faulted")] public PersistenceContext EndLoad(IAsyncResult result, out bool fromCache) { return LoadOrCreateAsyncResult.End(result, out fromCache); } [Fx.Tag.InheritThrows(From = "EndLoadOrCreate")] public IAsyncResult BeginLoadOrCreate(InstanceKey key, Guid suggestedId, ICollection associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) { if (key == null) { throw FxTrace.Exception.ArgumentNull("key"); } if (key.Value == Guid.Empty) { throw FxTrace.Exception.Argument("key", SR.InvalidKey); } return new LoadOrCreateAsyncResult(this, key, suggestedId, true, associatedKeys, transaction, false, timeout, callback, state); } [Fx.Tag.InheritThrows(From = "EndLoadOrCreate")] public IAsyncResult BeginLoadOrCreate(Guid instanceId, ICollection associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) { return new LoadOrCreateAsyncResult(this, null, instanceId, true, associatedKeys, transaction, false, timeout, callback, state); } [Fx.Tag.InheritThrows(From = "EndLoad")] public PersistenceContext EndLoadOrCreate(IAsyncResult result, out bool fromCache) { return LoadOrCreateAsyncResult.End(result, out fromCache); } public void Close() { bool needAbort = false; lock (ThisLock) { if (this.aborted) { ThrowIfClosedOrAborted(); } if (this.instanceCache != null) { if (this.instanceCache.Count > 0) { needAbort = true; } else { this.instanceCache = null; } } } if (needAbort) { Abort(); ThrowIfClosedOrAborted(); throw Fx.AssertAndThrow("Should have thrown due to abort."); } } public void Abort() { List contextsToAbort = null; HashSet pipelinesToAbort = null; lock (ThisLock) { this.aborted = true; if (this.instanceCache != null) { foreach (PersistenceContext context in this.instanceCache.Values.ToArray()) { DetachContext(context, ref contextsToAbort); } Fx.Assert(this.instanceCache.Count == 0, "All instances should have been detached."); Fx.Assert(this.keyMap.Count == 0, "All instances should have been removed from the key map."); this.instanceCache = null; } if (this.pipelinesInUse != null) { pipelinesToAbort = this.pipelinesInUse; this.pipelinesInUse = null; } } AbortContexts(contextsToAbort); if (pipelinesToAbort != null) { foreach (PersistencePipeline pipeline in pipelinesToAbort) { pipeline.Abort(); } } this.throttle.Abort(); } public Transaction GetTransactionForInstance(InstanceKey instanceKey) { Transaction result = null; PersistenceContext context; lock (ThisLock) { // It's okay if the instance doesn't exist. We will just return null for the transaction. if (this.keyMap.TryGetValue(instanceKey.Value, out context)) { result = context.LockingTransaction; if (result != null) { // Make a clone in case the caller ends up disposing the object. result = result.Clone(); } } } return result; } internal ReadOnlyCollection GetBookmarksForInstance(InstanceKey instanceKey) { ReadOnlyCollection result = null; PersistenceContext context; lock (ThisLock) { // It's okay if the instance doesn't exist. We will just return null. if (this.keyMap.TryGetValue(instanceKey.Value, out context)) { result = context.Bookmarks; } } return result; } internal bool TryAddAssociations(PersistenceContext context, IEnumerable keys, HashSet keysToAssociate, HashSet keysToDisassociate) { Fx.Assert(context != null, "TryAddAssociations cannot have a null context."); Fx.Assert(keys != null, "Cannot call TryAddAssociations with empty keys."); Fx.Assert(keysToAssociate != null, "Cannot call TryAddAssociations with null keysToAssociate."); // keysToDisassociate can be null if they should not be overridden by the new keys. List contextsToAbort = null; try { lock (ThisLock) { if (context.IsPermanentlyRemoved) { return false; } Fx.Assert(context.IsVisible, "Cannot call TryAddAssociations on an invisible context."); // In the case when there is no store, if key collision is detected, the current instance will be aborted later. // We should not add any of its keys to the keyMap. if (this.store == null) { foreach (InstanceKey key in keys) { PersistenceContext conflictingContext; if (!context.AssociatedKeys.Contains(key) && this.keyMap.TryGetValue(key.Value, out conflictingContext)) { throw FxTrace.Exception.AsError(new InstanceKeyCollisionException(null, context.InstanceId, key, conflictingContext.InstanceId)); } } } foreach (InstanceKey key in keys) { Fx.Assert(key.IsValid, "Cannot call TryAddAssociations with an invalid key."); if (context.AssociatedKeys.Contains(key)) { if (keysToDisassociate != null) { keysToDisassociate.Remove(key); } } else { Fx.AssertAndThrow(this.instanceCache != null, "Since the context must be visible, it must still be in the cache."); PersistenceContext contextToAbort; if (this.keyMap.TryGetValue(key.Value, out contextToAbort)) { Fx.Assert(this.store != null, "When there is no store, exception should have already been thrown before we get here."); DetachContext(contextToAbort, ref contextsToAbort); } this.keyMap.Add(key.Value, context); context.AssociatedKeys.Add(key); keysToAssociate.Add(key); } } return true; } } finally { AbortContexts(contextsToAbort); } } internal void RemoveAssociations(PersistenceContext context, IEnumerable keys) { Fx.Assert(context != null, "RemoveAssociation cannot have a null context."); Fx.Assert(keys != null, "Cannot call RemoveAssociation with empty keys."); lock (ThisLock) { if (context.IsPermanentlyRemoved) { return; } Fx.Assert(context.IsVisible, "Cannot remove associations from a context that's not visible."); foreach (InstanceKey key in keys) { if (context.AssociatedKeys.Remove(key)) { Fx.AssertAndThrow(this.instanceCache != null, "Since the context must be visible, it must still be in the cache."); Fx.Assert(this.keyMap[key.Value] == context, "Context's keys must be in the map."); this.keyMap.Remove(key.Value); } } } } // For transactional uses, call this method on commit. internal void RemoveInstance(PersistenceContext context) { RemoveInstance(context, false); } // For transactional uses, call this method on commit. internal void RemoveInstance(PersistenceContext context, bool permanent) { Fx.Assert(context != null, "RemoveInstance cannot have a null context."); lock (ThisLock) { if (permanent) { context.IsPermanentlyRemoved = true; } DetachContext(context); } } internal void ReleaseThrottle() { this.throttle.Exit(); } internal IAsyncResult BeginReserveThrottle(TimeSpan timeout, AsyncCallback callback, object state) { return new ReserveThrottleAsyncResult(this, timeout, callback, state); } internal void EndReserveThrottle(out bool ownsThrottle, IAsyncResult result) { if (result is CompletedAsyncResult) { ownsThrottle = true; } else { ownsThrottle = ReserveThrottleAsyncResult.End(result); } } void AbortContexts(List contextsToAbort) { if (contextsToAbort != null) { foreach (PersistenceContext contextToAbort in contextsToAbort) { contextToAbort.Abort(); } } } // See if the instance exists in our cache PersistenceContext LoadFromCache(InstanceKey key, Guid suggestedIdOrId, bool canCreateInstance) { PersistenceContext foundContext = null; if (key != null || suggestedIdOrId != Guid.Empty) { lock (ThisLock) { ThrowIfClosedOrAborted(); if (key == null) { this.instanceCache.TryGetValue(suggestedIdOrId, out foundContext); } else { this.keyMap.TryGetValue(key.Value, out foundContext); } // Done here to take advantage of the lock. Fx.Assert(this.instanceCache.Count <= MaxInstances, "Too many instances in PPD."); } } else { Fx.Assert(canCreateInstance, "Must be able to create an instance if not addressable."); } return foundContext; } InstancePersistenceCommand CreateLoadCommandHelper(InstanceKey key, out InstanceHandle handle, bool canCreateInstance, Guid suggestedIdOrId, ICollection associatedKeys, bool loadAny) { if (loadAny) { handle = this.store.CreateInstanceHandle(this.owner); return new TryLoadRunnableWorkflowCommand(); } else if (key != null) { LoadWorkflowByInstanceKeyCommand loadByKeyCommand; handle = this.store.CreateInstanceHandle(this.owner); if (canCreateInstance) { loadByKeyCommand = new LoadWorkflowByInstanceKeyCommand() { LookupInstanceKey = key.Value, AssociateInstanceKeyToInstanceId = suggestedIdOrId == Guid.Empty ? Guid.NewGuid() : suggestedIdOrId, AcceptUninitializedInstance = true, }; } else { loadByKeyCommand = new LoadWorkflowByInstanceKeyCommand() { LookupInstanceKey = key.Value, }; } InstanceKey lookupKeyToAdd = (canCreateInstance && key.Metadata != null && key.Metadata.Count > 0) ? key : null; if (associatedKeys != null) { foreach (InstanceKey keyToAssociate in associatedKeys) { if (keyToAssociate == key) { if (!canCreateInstance) { continue; } lookupKeyToAdd = null; } TryAddKeyToInstanceKeysCollection(loadByKeyCommand.InstanceKeysToAssociate, keyToAssociate); } } if (lookupKeyToAdd != null) { TryAddKeyToInstanceKeysCollection(loadByKeyCommand.InstanceKeysToAssociate, lookupKeyToAdd); } return loadByKeyCommand; } else { if (associatedKeys != null) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.NoAdditionalKeysOnInstanceIdLoad)); } handle = this.store.CreateInstanceHandle(this.owner, suggestedIdOrId == Guid.Empty ? Guid.NewGuid() : suggestedIdOrId); return new LoadWorkflowCommand() { AcceptUninitializedInstance = canCreateInstance, }; } } static void TryAddKeyToInstanceKeysCollection(IDictionary > instanceKeysToAssociate, InstanceKey keyToAdd) { Fx.Assert(instanceKeysToAssociate != null, "instanceKeysToAssociate cannot be null"); Fx.Assert(keyToAdd != null, "keyToAdd cannot be null"); if (instanceKeysToAssociate.ContainsKey(keyToAdd.Value)) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.DuplicateInstanceKeyExists(keyToAdd.Value))); } instanceKeysToAssociate.Add(keyToAdd.Value, keyToAdd.Metadata); } void DetachContext(PersistenceContext contextToAbort, ref List contextsToAbort) { if (contextsToAbort == null) { contextsToAbort = new List (); } contextsToAbort.Add(contextToAbort); DetachContext(contextToAbort); } void DetachContext(PersistenceContext contextToAbort) { if (contextToAbort.IsVisible) { Fx.Assert(this.instanceCache != null, "All contexts should not be visible if we are closed / aborted."); foreach (InstanceKey key in contextToAbort.AssociatedKeys) { Fx.Assert(this.keyMap[key.Value] == contextToAbort, "Context's key must be in the map."); this.keyMap.Remove(key.Value); } try { } finally { if (this.instanceCache.Remove(contextToAbort.InstanceId)) { contextToAbort.IsVisible = false; this.throttle.Exit(); } else { Fx.Assert("Context must be in the cache."); } } } } void ThrowIfClosedOrAborted() { if (this.instanceCache == null) { if (this.aborted) { throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DirectoryAborted)); } else { throw FxTrace.Exception.AsError(new ObjectDisposedException(GetType().Name)); } } } void RegisterPipelineInUse(PersistencePipeline pipeline) { lock (ThisLock) { if (this.aborted) { throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DirectoryAborted)); } this.pipelinesInUse.Add(pipeline); } } void UnregisterPipelineInUse(PersistencePipeline pipeline) { lock (ThisLock) { if (!this.aborted) { this.pipelinesInUse.Remove(pipeline); } } } class LoadOrCreateAsyncResult : AsyncResult { static Action onComplete = new Action (OnComplete); static AsyncCompletion handleReserveThrottle = new AsyncCompletion(HandleReserveThrottle); static AsyncCompletion handleExecute = new AsyncCompletion(HandleExecute); static Action
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TypeProvider.cs
- AuthorizationRuleCollection.cs
- BamlTreeMap.cs
- DataGridViewCellStyleConverter.cs
- GiveFeedbackEventArgs.cs
- WebPartVerb.cs
- _CookieModule.cs
- DataGridViewTextBoxEditingControl.cs
- XmlSchemaSimpleTypeRestriction.cs
- OutputCacheSettingsSection.cs
- ChannelBinding.cs
- AmbientValueAttribute.cs
- BookmarkTable.cs
- ExpressionBuilderContext.cs
- TakeOrSkipQueryOperator.cs
- StaticExtensionConverter.cs
- AttributeProviderAttribute.cs
- FilterQueryOptionExpression.cs
- _NestedSingleAsyncResult.cs
- BlockingCollection.cs
- Calendar.cs
- MessagePropertyFilter.cs
- PackageFilter.cs
- VectorAnimation.cs
- ExpressionConverter.cs
- ObjectListItemCollection.cs
- DataBindingCollection.cs
- ClusterRegistryConfigurationProvider.cs
- Renderer.cs
- Msec.cs
- ActivityFunc.cs
- BlurEffect.cs
- DataGridViewLinkColumn.cs
- InputDevice.cs
- MdImport.cs
- DataSvcMapFileSerializer.cs
- AttachmentService.cs
- TypeSystemProvider.cs
- TraceRecord.cs
- DataTransferEventArgs.cs
- TokenBasedSet.cs
- Point3DCollectionConverter.cs
- OrderPreservingPipeliningSpoolingTask.cs
- ToolboxDataAttribute.cs
- LowerCaseStringConverter.cs
- MdiWindowListItemConverter.cs
- DataBoundControlAdapter.cs
- NameTable.cs
- TextOptionsInternal.cs
- DataGridViewTextBoxCell.cs
- Variant.cs
- MouseButtonEventArgs.cs
- MonthCalendar.cs
- DBSchemaRow.cs
- RequestBringIntoViewEventArgs.cs
- ExtendedProtectionPolicyElement.cs
- CommandConverter.cs
- ConfigurationPropertyCollection.cs
- DataSetSchema.cs
- SelectionBorderGlyph.cs
- PersistChildrenAttribute.cs
- AlignmentXValidation.cs
- ThrowHelper.cs
- SourceInterpreter.cs
- CatalogZoneBase.cs
- OneToOneMappingSerializer.cs
- complextypematerializer.cs
- Storyboard.cs
- PathFigureCollectionValueSerializer.cs
- FixedElement.cs
- WizardStepBase.cs
- FigureParagraph.cs
- DataGridCommandEventArgs.cs
- ListDataBindEventArgs.cs
- BmpBitmapDecoder.cs
- FontEmbeddingManager.cs
- LogEntry.cs
- nulltextnavigator.cs
- WindowProviderWrapper.cs
- DesignerWidgets.cs
- PerformanceCounterPermissionAttribute.cs
- XmlSchemaAttributeGroup.cs
- URLAttribute.cs
- Fx.cs
- ExpandedWrapper.cs
- TextFormattingConverter.cs
- StylusDevice.cs
- InputProcessorProfilesLoader.cs
- ToolStripButton.cs
- DrawItemEvent.cs
- ErrorHandler.cs
- XMLUtil.cs
- MeasurementDCInfo.cs
- PropertyGeneratedEventArgs.cs
- Trace.cs
- HtmlInputCheckBox.cs
- OdbcConnectionPoolProviderInfo.cs
- SchemaImporterExtensionsSection.cs
- DataGridColumnReorderingEventArgs.cs
- Token.cs