DuplexChannelBinder.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Dispatcher / DuplexChannelBinder.cs / 1 / DuplexChannelBinder.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Dispatcher 
{
    using System; 
    using System.ServiceModel; 
    using System.ServiceModel.Security;
    using System.Collections; 
    using System.Collections.Generic;
    using System.Threading;
    using System.ServiceModel.Channels;
    using System.Xml; 
    using System.ServiceModel.Diagnostics;
    using System.Runtime.CompilerServices; 
 
    class DuplexChannelBinder : IChannelBinder
    { 
        IDuplexChannel channel;
        IRequestReplyCorrelator correlator;
        TimeSpan defaultCloseTimeout;
        TimeSpan defaultSendTimeout; 
        bool faulted = false;
        IdentityVerifier identityVerifier; 
        bool isSession; 
        Uri listenUri;
        int pending; 
        bool syncPumpEnabled;
        List requests;
        ChannelHandler channelHandler;
 
        internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator)
            : this(channel, correlator, null) 
        { 
            // empty
        } 

        internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator, Uri listenUri)
        {
            if (!((channel != null))) 
            {
                DiagnosticUtility.DebugAssert("DuplexChannelBinder.DuplexChannelBinder: (channel != null)"); 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channel"); 
            }
            if (!((correlator != null))) 
            {
                DiagnosticUtility.DebugAssert("DuplexChannelBinder.DuplexChannelBinder: (correlator != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("correlator");
            } 

            this.channel = channel; 
            this.correlator = correlator; 
            this.listenUri = listenUri;
            this.channel.Faulted += new EventHandler(OnFaulted); 
        }

        internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator)
            : this(channel, correlator, null) 
        {
            // empty 
        } 

        internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator, Uri listenUri) 
            : this((IDuplexChannel)channel, correlator, listenUri)
        {
            this.isSession = true;
        } 

        public IChannel Channel 
        { 
            get { return this.channel; }
        } 

        public TimeSpan DefaultCloseTimeout
        {
            get { return this.defaultCloseTimeout; } 
            set { this.defaultCloseTimeout = value; }
        } 
 
        internal ChannelHandler ChannelHandler
        { 
            get
            {
                if (!(this.channelHandler != null))
                { 
                    DiagnosticUtility.DebugAssert("DuplexChannelBinder.ChannelHandler: (channelHandler != null)");
                } 
                return this.channelHandler; 
            }
            set 
            {
                if (!(this.channelHandler == null))
                {
                    DiagnosticUtility.DebugAssert("DuplexChannelBinder.ChannelHandler: (channelHandler == null)"); 
                }
                this.channelHandler = value; 
            } 
        }
 
        public TimeSpan DefaultSendTimeout
        {
            get { return this.defaultSendTimeout; }
            set { this.defaultSendTimeout = value; } 
        }
 
        public bool HasSession 
        {
            get { return this.isSession; } 
        }

        internal IdentityVerifier IdentityVerifier
        { 
            get
            { 
                if (this.identityVerifier == null) 
                {
                    this.identityVerifier = IdentityVerifier.CreateDefault(); 
                }

                return this.identityVerifier;
            } 
            set
            { 
                if (value == null) 
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value"); 
                }

                this.identityVerifier = value;
            } 
        }
 
        internal bool IsFaulted 
        {
            get { return this.faulted; } 
        }

        public Uri ListenUri
        { 
            get { return this.listenUri; }
        } 
 
        public EndpointAddress LocalAddress
        { 
            get { return this.channel.LocalAddress; }
        }

        bool Pumping 
        {
            get 
            { 
                if (this.syncPumpEnabled)
                    return true; 

                if (this.channelHandler != null && this.channelHandler.HasRegisterBeenCalled)
                    return true;
 
                return false;
            } 
        } 

        public EndpointAddress RemoteAddress 
        {
            get { return this.channel.RemoteAddress; }
        }
 
        List Requests
        { 
            get 
            {
                lock (this.ThisLock) 
                {
                    if (this.requests == null)
                        this.requests = new List();
                    return this.requests; 
                }
            } 
        } 

        object ThisLock 
        {
            get { return this; }
        }
 
        void OnFaulted(object sender, EventArgs e)
        { 
            //Some unhandled exception happened on the channel. 
            //So close all pending requests so the callbacks (in case of async)
            //on the requests are called. 
            this.AbortRequests();
        }

        public void Abort() 
        {
            this.channel.Abort(); 
            this.AbortRequests(); 
        }
 
        public void CloseAfterFault(TimeSpan timeout)
        {
            this.channel.Close(timeout);
            this.AbortRequests(); 
        }
 
        void AbortRequests() 
        {
            lock (this.ThisLock) 
            {
                if (this.requests != null)
                {
                    IDuplexRequest[] array = this.requests.ToArray(); 
                    foreach (IDuplexRequest request in array)
                        request.Abort(); 
                } 
                this.requests = null;
            } 
        }

        TimeoutException GetReceiveTimeoutException(TimeSpan timeout)
        { 
            EndpointAddress address = this.channel.RemoteAddress ?? this.channel.LocalAddress;
            if (address != null) 
            { 
                return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut2, address, timeout));
            } 
            else
            {
                return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut1, timeout));
            } 
        }
 
        internal bool HandleRequestAsReply(Message message) 
        {
            UniqueId relatesTo = null; 
            try
            {
                relatesTo = message.Headers.RelatesTo;
            } 
            catch (MessageHeaderException)
            { 
                // ignore it 
            }
            if (relatesTo == null) 
            {
                return false;
            }
            else 
            {
                return HandleRequestAsReplyCore(message); 
            } 
        }
 
        bool HandleRequestAsReplyCore(Message message)
        {
            IDuplexRequest request = correlator.Find(message, true);
            if (request != null) 
            {
                request.GotReply(message); 
                return true; 
            }
            return false; 
        }

        public void EnsurePumping()
        { 
            lock (this.ThisLock)
            { 
                if (!this.syncPumpEnabled) 
                {
                    if (!this.ChannelHandler.HasRegisterBeenCalled) 
                        ChannelHandler.Register(this.ChannelHandler);
                }
            }
        } 

        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            if (this.channel.State == CommunicationState.Faulted)
                return new ChannelFaultedAsyncResult(callback, state); 

            return this.channel.BeginTryReceive(timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        { 
            ChannelFaultedAsyncResult channelFaultedResult = result as ChannelFaultedAsyncResult; 
            if (channelFaultedResult != null)
            { 
                this.AbortRequests();
                requestContext = null;
                return true;
            } 

            Message message; 
            if (this.channel.EndTryReceive(result, out message)) 
            {
                if (message != null) 
                {
                    requestContext = new DuplexRequestContext(this.channel, message, this);
                }
                else 
                {
                    this.AbortRequests(); 
                    requestContext = null; 
                }
                return true; 
            }
            else
            {
                requestContext = null; 
                return false;
            } 
        } 

        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return this.channel.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        { 
            this.channel.EndSend(result); 
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            this.channel.Send(message, timeout);
        } 

        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            bool success = false;
            AsyncDuplexRequest duplexRequest = null; 

            try
            {
                RequestReplyCorrelator.PrepareRequest(message); 
                duplexRequest = new AsyncDuplexRequest(message, this, timeout, callback, state);
 
                lock (this.ThisLock) 
                {
                    this.RequestStarting(message, duplexRequest); 
                }

                IAsyncResult result = this.channel.BeginSend(message, timeout, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(this.SendCallback)), duplexRequest);
 
                if (result.CompletedSynchronously)
                    duplexRequest.FinishedSend(result, true); 
 
                EnsurePumping();
 
                success = true;
                return duplexRequest;
            }
            finally 
            {
                lock (this.ThisLock) 
                { 
                    if (success)
                    { 
                        duplexRequest.EnableCompletion();
                    }
                    else
                    { 
                        this.RequestCompleting(duplexRequest);
                    } 
                } 
            }
        } 

        public Message EndRequest(IAsyncResult result)
        {
            AsyncDuplexRequest duplexRequest = result as AsyncDuplexRequest; 

            if (duplexRequest == null) 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentException(SR.GetString(SR.InvalidAsyncResult))); 

            return duplexRequest.End(); 
        }

        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        { 
            if (this.channel.State == CommunicationState.Faulted)
            { 
                this.AbortRequests(); 
                requestContext = null;
                return true; 
            }

            Message message;
            if (this.channel.TryReceive(timeout, out message)) 
            {
                if (message != null) 
                { 
                    requestContext = new DuplexRequestContext(this.channel, message, this);
                } 
                else
                {
                    this.AbortRequests();
                    requestContext = null; 
                }
                return true; 
            } 
            else
            { 
                requestContext = null;
                return false;
            }
        } 

        public Message Request(Message message, TimeSpan timeout) 
        { 
            SyncDuplexRequest duplexRequest = null;
            bool optimized = false; 

            RequestReplyCorrelator.PrepareRequest(message);

            lock (this.ThisLock) 
            {
                if (!Pumping) 
                { 
                    optimized = true;
                    syncPumpEnabled = true; 
                }

                if (!optimized)
                    duplexRequest = new SyncDuplexRequest(this); 

                this.RequestStarting(message, duplexRequest); 
            } 

            if (optimized) 
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                UniqueId messageId = message.Headers.MessageId;
 
                try
                { 
                    this.channel.Send(message, timeoutHelper.RemainingTime()); 
                    if (DiagnosticUtility.ShouldUseActivity &&
                        ServiceModelActivity.Current != null && 
                        ServiceModelActivity.Current.ActivityType == ActivityType.ProcessAction)
                    {
                        ServiceModelActivity.Current.Suspend();
                    } 

                    for (;;) 
                    { 
                        TimeSpan remaining = timeoutHelper.RemainingTime();
                        Message reply; 

                        if (!this.channel.TryReceive(remaining, out reply))
                        {
                            throw TraceUtility.ThrowHelperError(this.GetReceiveTimeoutException(timeout), message); 
                        }
 
                        if (reply == null) 
                        {
                            this.AbortRequests(); 
                            return null;
                        }

                        if (reply.Headers.RelatesTo == messageId) 
                        {
                            this.ThrowIfInvalidReplyIdentity(reply); 
                            return reply; 
                        }
                        else if (!this.HandleRequestAsReply(reply)) 
                        {
                            // SFx drops a message here
                            if (DiagnosticUtility.ShouldTraceInformation)
                            { 
                                EndpointDispatcher dispatcher = null;
                                if (this.ChannelHandler != null && this.ChannelHandler.Channel != null) 
                                { 
                                    dispatcher = this.ChannelHandler.Channel.EndpointDispatcher;
                                } 
                                TraceUtility.TraceDroppedMessage(reply, dispatcher);
                            }
                            reply.Close();
                        } 
                    }
                } 
                finally 
                {
                    lock (this.ThisLock) 
                    {
                        this.RequestCompleting(null);
                        syncPumpEnabled = false;
                        if (this.pending > 0) 
                            EnsurePumping();
                    } 
                } 
            }
            else 
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                this.channel.Send(message, timeoutHelper.RemainingTime());
                EnsurePumping(); 
                return duplexRequest.WaitForReply(timeoutHelper.RemainingTime());
            } 
        } 

        // ASSUMPTION: ([....]) caller holds lock (this.mutex) 
        void RequestStarting(Message message, IDuplexRequest request)
        {
            if (request != null)
            { 
                this.Requests.Add(request);
                this.correlator.Add(message, request); 
            } 
            this.pending++;
 
        }

        // ASSUMPTION: ([....]) caller holds lock (this.mutex)
        void RequestCompleting(IDuplexRequest request) 
        {
            this.pending--; 
            if (this.pending == 0) 
            {
                this.requests = null; 
            }
            else if ((request != null) && (this.requests != null))
            {
                this.requests.Remove(request); 
            }
        } 
 
        void SendCallback(IAsyncResult result)
        { 
            AsyncDuplexRequest duplexRequest = result.AsyncState as AsyncDuplexRequest;
            if (!((duplexRequest != null)))
            {
                DiagnosticUtility.DebugAssert("DuplexChannelBinder.RequestCallback: (duplexRequest != null)"); 
            }
 
            if (!result.CompletedSynchronously) 
                duplexRequest.FinishedSend(result, false);
        } 

        [MethodImpl(MethodImplOptions.NoInlining)]
        void EnsureIncomingIdentity(SecurityMessageProperty property, EndpointAddress address, Message reply)
        { 
            this.IdentityVerifier.EnsureIncomingIdentity(address, property.ServiceSecurityContext.AuthorizationContext);
        } 
 
        void ThrowIfInvalidReplyIdentity(Message reply)
        { 
            if (!this.isSession)
            {
                SecurityMessageProperty property = reply.Properties.Security;
                EndpointAddress address = this.channel.RemoteAddress; 

                if ((property != null) && (address != null)) 
                { 
                    EnsureIncomingIdentity(property, address, reply);
                } 
            }
        }

        public bool WaitForMessage(TimeSpan timeout) 
        {
            return this.channel.WaitForMessage(timeout); 
        } 

        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return this.channel.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        { 
            return this.channel.EndWaitForMessage(result); 
        }
 
        class DuplexRequestContext : RequestContextBase
        {
            DuplexChannelBinder binder;
            IDuplexChannel channel; 

            internal DuplexRequestContext(IDuplexChannel channel, Message request, DuplexChannelBinder binder) 
                : base(request, binder.DefaultCloseTimeout, binder.DefaultSendTimeout) 
            {
                this.channel = channel; 
                this.binder = binder;
            }

            protected override void OnAbort() 
            {
            } 
 
            protected override void OnClose(TimeSpan timeout)
            { 
            }

            protected override void OnReply(Message message, TimeSpan timeout)
            { 
                if (message != null)
                { 
                    this.channel.Send(message, timeout); 
                }
            } 

            protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new ReplyAsyncResult(this, message, timeout, callback, state); 
            }
 
            protected override void OnEndReply(IAsyncResult result) 
            {
                ReplyAsyncResult.End(result); 
            }

            class ReplyAsyncResult : AsyncResult
            { 
                static AsyncCallback onSend;
                DuplexRequestContext context; 
 
                public ReplyAsyncResult(DuplexRequestContext context, Message message, TimeSpan timeout, AsyncCallback callback, object state)
                    : base(callback, state) 
                {
                    if (message != null)
                    {
                        if (onSend == null) 
                        {
 
                            onSend = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnSend)); 
                        }
                        this.context = context; 
                        IAsyncResult result = context.channel.BeginSend(message, timeout, onSend, this);
                        if (!result.CompletedSynchronously)
                        {
                            return; 
                        }
                        context.channel.EndSend(result); 
                    } 

                    base.Complete(true); 
                }

                public static void End(IAsyncResult result)
                { 
                    AsyncResult.End(result);
                } 
 
                static void OnSend(IAsyncResult result)
                { 
                    if (result.CompletedSynchronously)
                    {
                        return;
                    } 

                    Exception completionException = null; 
                    ReplyAsyncResult thisPtr = (ReplyAsyncResult)result.AsyncState; 
                    try
                    { 
                        thisPtr.context.channel.EndSend(result);
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception exception) 
                    {
                        if (DiagnosticUtility.IsFatal(exception)) 
                        { 
                            throw;
                        } 
                        completionException = exception;
                    }

                    thisPtr.Complete(false, completionException); 
                }
            } 
        } 

        interface IDuplexRequest 
        {
            void Abort();
            void GotReply(Message reply);
        } 

        class SyncDuplexRequest : IDuplexRequest 
        { 
            Message reply;
            DuplexChannelBinder parent; 
            ManualResetEvent wait = new ManualResetEvent(false);
            int waitCount = 0;

            internal SyncDuplexRequest(DuplexChannelBinder parent) 
            {
                this.parent = parent; 
            } 

            public void Abort() 
            {
                this.wait.Set();
            }
 
            internal Message WaitForReply(TimeSpan timeout)
            { 
                try 
                {
                    if (!TimeoutHelper.WaitOne(this.wait, timeout, false)) 
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.GetReceiveTimeoutException(timeout));
                    }
                } 
                finally
                { 
                    this.CloseWaitHandle(); 
                }
 
                this.parent.ThrowIfInvalidReplyIdentity(this.reply);
                return this.reply;
            }
 
            public void GotReply(Message reply)
            { 
                lock (this.parent.ThisLock) 
                {
                    this.parent.RequestCompleting(this); 
                }
                this.reply = reply;
                this.wait.Set();
                this.CloseWaitHandle(); 
            }
 
            void CloseWaitHandle() 
            {
                if (Interlocked.Increment(ref this.waitCount) == 2) 
                {
                    this.wait.Close();
                }
            } 
        }
 
        class AsyncDuplexRequest : AsyncResult, IDuplexRequest 
        {
            static WaitCallback timerCallback = new WaitCallback(AsyncDuplexRequest.TimerCallback); 

            bool aborted;
            bool enableComplete;
            bool gotReply; 
            Exception sendException;
            IAsyncResult sendResult; 
            DuplexChannelBinder parent; 
            Message reply;
            bool timedOut; 
            TimeSpan timeout;
            IOThreadTimer timer;
            ServiceModelActivity activity;
 
            internal AsyncDuplexRequest(Message message, DuplexChannelBinder parent, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            { 
                this.parent = parent;
                this.timeout = timeout; 

                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(AsyncDuplexRequest.timerCallback, this, true); 
                    this.timer.Set(timeout);
                } 
                if (DiagnosticUtility.ShouldUseActivity) 
                {
                    this.activity = TraceUtility.ExtractActivity(message); 
                }
            }

            bool IsDone 
            {
                get 
                { 
                    if (!this.enableComplete)
                    { 
                        return false;
                    }

                    return (((this.sendResult != null) && this.gotReply) || 
                            (this.sendException != null) ||
                            this.timedOut || 
                            this.aborted); 
                }
            } 

            public void Abort()
            {
                bool done; 

                lock (this.parent.ThisLock) 
                { 
                    bool wasDone = this.IsDone;
                    this.aborted = true; 
                    done = !wasDone && this.IsDone;
                }

                if (done) 
                    this.Done(false);
            } 
 
            void Done(bool completedSynchronously)
            { 
                // Make sure that we are acting on the Reply activity.
                ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ?
                    TraceUtility.ExtractActivity(this.reply) : null;
                using (ServiceModelActivity.BoundOperation(replyActivity)) 
                {
                    if (this.timer != null) 
                    { 
                        this.timer.Cancel();
                        this.timer = null; 
                    }

                    lock (this.parent.ThisLock)
                    { 
                        this.parent.RequestCompleting(this);
                    } 
 
                    if (this.sendException != null)
                        this.Complete(completedSynchronously, this.sendException); 
                    else if (this.timedOut)
                        this.Complete(completedSynchronously, this.parent.GetReceiveTimeoutException(this.timeout));
                    else
                        this.Complete(completedSynchronously); 
                }
            } 
 
            public void EnableCompletion()
            { 
                bool done;

                lock (this.parent.ThisLock)
                { 
                    bool wasDone = this.IsDone;
                    this.enableComplete = true; 
                    done = !wasDone && this.IsDone; 
                }
 
                if (done)
                    this.Done(true);
            }
 
            public void FinishedSend(IAsyncResult sendResult, bool completedSynchronously)
            { 
                Exception sendException = null; 

                try 
                {
                    this.parent.channel.EndSend(sendResult);
                }
#pragma warning suppress 56500 // covered by FxCOP 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                        throw;
 
                    sendException = e;
                }

                bool done; 

                lock (this.parent.ThisLock) 
                { 
                    bool wasDone = this.IsDone;
                    this.sendResult = sendResult; 
                    this.sendException = sendException;
                    done = !wasDone && this.IsDone;
                }
 
                if (done)
                    this.Done(completedSynchronously); 
            } 

            internal Message End() 
            {
                AsyncResult.End(this);
                this.parent.ThrowIfInvalidReplyIdentity(this.reply);
                return this.reply; 
            }
 
            public void GotReply(Message reply) 
            {
                bool done; 

                ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ?
                    TraceUtility.ExtractActivity(reply) : null;
 
                using (ServiceModelActivity.BoundOperation(replyActivity))
                { 
                    lock (this.parent.ThisLock) 
                    {
                        bool wasDone = this.IsDone; 
                        this.reply = reply;
                        this.gotReply = true;
                        done = !wasDone && this.IsDone;
                    } 
                    if (replyActivity != null && DiagnosticUtility.ShouldUseActivity)
                    { 
                        TraceUtility.SetActivity(reply, this.activity); 
                        if (DiagnosticUtility.ShouldUseActivity && this.activity != null)
                        { 
                            DiagnosticUtility.DiagnosticTrace.TraceTransfer(this.activity.Id);
                        }
                    }
                } 
                if (DiagnosticUtility.ShouldUseActivity && replyActivity != null)
                { 
                    replyActivity.Stop(); 
                }
 
                if (done)
                    this.Done(false);
            }
 
            void TimedOut()
            { 
                bool done; 

                lock (this.parent.ThisLock) 
                {
                    bool wasDone = this.IsDone;
                    this.timedOut = true;
                    done = !wasDone && this.IsDone; 
                }
 
                if (done) 
                    this.Done(false);
            } 

            static void TimerCallback(object state)
            {
                ((AsyncDuplexRequest)state).TimedOut(); 
            }
        } 
 
        class ChannelFaultedAsyncResult : CompletedAsyncResult
        { 
            public ChannelFaultedAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
            } 
        }
    } 
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK