F# - Interacting With WPF Dispatcher Via F# Interactive Window

by Dean 8. October 2010 08:44

Heres an interesting scenario that was discussed with me by one of the quants in the front-office team yesterday.

"Can I invoke a WPF Window/Control to popup from the FSI window in Visual Studio ?" - "Yes, thats easy I said"....

"and then interact with it ASYNCHRONOUSLY via F# commands in the FSI window ?" - "hmmmm, I said"

So the first bit is easy and has been done many times before with WPF and Winforms, and that is mainly to instantiate UI controls and popups from within Fsharp Interactive, and update these controls when running F# commands..... but....... you're doing your updates synchronously on the main UI thread !

What if the update your running is a long-running task, that WPF popup is going to freeze on you, and to solve it you are facing the classic issue of STA thread affinity with either WPF or Winforms.

The common solution in standard WPF/Winforms apps is to marshall messages to the UI pump via delegates, but how would that work in F# interactive ?

After a little head-scratching, I figured it out, and here is a sample of how its done below:

 

#light

#r "WindowsBase"
#r "PresentationCore"
#r "PresentationFramework"

open System
open System.Windows
open System.Windows.Controls
open System.Threading
open System.Windows.Threading

// create a reference to a WPF control in interactive window
let mutable (wp : TextBlock) = null

// create a new WPF gui thread with a running dispatcher and message pump
let thread = new System.Threading.Thread(fun() ->
    let window = new System.Windows.Window(Name="Test",Width=500.0,Height=500.0)
    wp <- new TextBlock()
    wp.Text <- "test1"
    window.Content <- wp
    window.Visibility <- Visibility.Visible
    window.Show()
    window.Closed.Add(fun e ->
        Dispatcher.CurrentDispatcher.BeginInvokeShutdown(DispatcherPriority.Background)
        Thread.CurrentThread.Abort())
    Dispatcher.Run()
    )
thread.SetApartmentState(ApartmentState.STA)
thread.IsBackground <- true

// start the thread, which will invoke the popup ui
thread.Start()

// once WPF window is up, you can marshall updates via its running dispatcher
wp.Dispatcher.BeginInvoke(Action(fun _ -> wp.Text <- "test2")) |> ignore

To test this, run all but the last line in the interactive window, and you should see a popup like this:

 

 

Then run the final line, which marshalls the update onto the new gui thread, and you get this

 

Proving the solution:

So what this means now is that we can create sophosticated WPF controls, invoke them in the FSI window, and then apply long-running upddates via FSI without the WPF window freezing on us.

That should keep the trading desk guys happy :)

Dean

Tags: , , ,

F# | Threading | WPF

C# - Fast Parallel ConcurrentList<T> Implementation

by Dean 7. October 2010 09:59

The new Task Parallel Library (TPL) introduced in the .NET 4 framework simplifies many aspects of writing parallel code, without much of the necessary thread synchronisation logic that is usually required.

One of the really nice aspects of TPL is the System.Collections.Concurrent namespace, which includes a number of collection classes that fully support multithreaded access and fast parallel access. The only glaring omission (in my opinion) is the absence of a ConcurrentList<T> class.

So, I thought Id create one, and the code is below:

public class ConcurrentList<T> : IList<T>, IList
{
    private readonly List<T> underlyingList = new List<T>();
    private readonly object syncRoot = new object();
    private readonly ConcurrentQueue<T> underlyingQueue;
    private bool requiresSync;
    private bool isDirty;

    public ConcurrentList()
    {
        underlyingQueue = new ConcurrentQueue<T>();
    }

    public ConcurrentList(IEnumerable<T> items)
    {
        underlyingQueue = new ConcurrentQueue<T>(items);
    }

    private void UpdateLists()
    {
        if (!isDirty)
            return;
        lock (syncRoot)
        {
            requiresSync = true;
            T temp;
            while (underlyingQueue.TryDequeue(out temp))
                underlyingList.Add(temp);
            requiresSync = false;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void Add(T item)
    {
        if (requiresSync)
            lock (syncRoot)
                underlyingQueue.Enqueue(item);
        else
            underlyingQueue.Enqueue(item);
        isDirty = true;
    }

    public int Add(object value)
    {
        if (requiresSync)
            lock (syncRoot)
                underlyingQueue.Enqueue((T)value);
        else
            underlyingQueue.Enqueue((T)value);
        isDirty = true;
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf((T)value);
        }
    }

    public bool Contains(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Contains((T)value);
        }
    }

    public int IndexOf(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf((T)value);
        }
    }

    public void Insert(int index, object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Insert(index, (T)value);
        }
    }

    public void Remove(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Remove((T)value);
        }
    }

    public void RemoveAt(int index)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.RemoveAt(index);
        }
    }

    T IList<T>.this[int index]
    {
        get
        {
            lock (syncRoot)
            {
                UpdateLists();
                return underlyingList[index];
            }
        }
        set
        {
            lock (syncRoot)
            {
                UpdateLists();
                underlyingList[index] = value;
            }
        }
    }

    object IList.this[int index]
    {
        get { return ((IList<T>)this)[index]; }
        set { ((IList<T>)this)[index] = (T)value; }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }

    public bool IsFixedSize
    {
        get { return false; }
    }

    public void Clear()
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Clear();
        }
    }

    public bool Contains(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Contains(item);
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.CopyTo(array, arrayIndex);
        }
    }

    public bool Remove(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Remove(item);
        }
    }

    public void CopyTo(Array array, int index)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.CopyTo((T[])array, index);
        }
    }

    public int Count
    {
        get
        {
            lock (syncRoot)
            {
                UpdateLists();
                return underlyingList.Count;
            }
        }
    }

    public object SyncRoot
    {
        get { return syncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public int IndexOf(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf(item);
        }
    }

    public void Insert(int index, T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Insert(index, item);
        }
    }
}

The design of this class is simple. Adding items to the list does in fact add items to an underlying ConcurrentQueue<T> object, which enables the list to be added to in a high performance and thread-safe maner, that can take full advantage of task paralellism. Tests have shown this to be equally as fast as the other concurrent collection classes when adding items.

Then, when any of the other public members of the ConcurrentList<T> class are accessed, it freezes thread access to its underlying ConcurrentQueue<T> store and copies items from the ConcurrentQueue<T> to an underlying List<T> (if the underlying ConcurrentQueue<T> object hasnt been added-to since the last update, it skips this process, increasing performance once the list is fully populated), and then accesses the same IList<T> or IList interface implementation of the underlying list (effectively decorating the underlying List<T>). Thread synchronisation is implemented here on all members, so that access is serialized, thus making the class completely thread-safe.

This implementation has the following characteristics:

  1. In a single threaded scenario, it has a very similar performance to List<T> in all operations.
  2. In a multi-threaded (task parallel) item adding scenario, it was much faster at adding items than List<T> (by several magnitudes)
  3. In a multithreaded access scenario (except when adding items), it has very similar performance to List<T>

In order to test these scenarios, I wrote the following test code:

class Program
    {
        static void DoWork(ICollection<int> list, int count)
        {
            for (var i = 0; i < count; i++)
            {
                list.Add(i);
                // use spinwait to emulate work but avoiding context switching
                Thread.SpinWait(100000);
            }

        }
        static void Main(string[] args)
        {
            Console.WriteLine("standard List<T> - 10000 work items");
            var list1 = new List<int>();
            var start1 = DateTime.Now.Ticks;
            DoWork(list1, 10000);
            var end1 = DateTime.Now.Ticks;
            var c1 = list1.Count; // accesses list
            var cend1 = DateTime.Now.Ticks;
            Console.WriteLine();
            Console.WriteLine("Work Time: {0} - milliseconds", (end1 - start1) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine("Get Count Time: {0} - milliseconds", (cend1 - end1) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine();
            Console.WriteLine();

            Console.WriteLine("ConcurrentList<T> - 10000 work items on single thread");
            var list2 = new ConcurrentList<int>();
            var start2 = DateTime.Now.Ticks;
            DoWork(list2, 10000);
            var end2 = DateTime.Now.Ticks;
            var c2 = list2.Count; // accesses list, update performed
            var cend2 = DateTime.Now.Ticks;
            Console.WriteLine();
            Console.WriteLine("Work Time: {0} - milliseconds", (end2 - start2) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine("Get Count Time: {0} - milliseconds", (cend2 - end2) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine();
            Console.WriteLine();

            Console.WriteLine("ConcurrentList<T> - 10000 work items on 4 parallel tasks");
            var list3 = new ConcurrentList<int>();
            var start3 = DateTime.Now.Ticks;
            var tasks3 = new Task[4]
              {
                  Task.Factory.StartNew(() => DoWork(list3,2500)),
                  Task.Factory.StartNew(() => DoWork(list3,2500)),
                  Task.Factory.StartNew(() => DoWork(list3,2500)),
                  Task.Factory.StartNew(() => DoWork(list3,2500))
              };
            Task.WaitAll(tasks3);
            var end3 = DateTime.Now.Ticks;
            var c3 = list3.Count; // accesses list, update performed
            var cend3 = DateTime.Now.Ticks;
            Console.WriteLine();
            Console.WriteLine("Work Time: {0} - milliseconds", (end3 - start3) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine("Get Count Time: {0} - milliseconds", (cend3 - end3) / TimeSpan.TicksPerMillisecond);
            Console.WriteLine();

            Console.ReadLine();
        }
    }

And here is a screenshot of the result

 

 

Tags: , ,

C# | Threading | WPF

Thread-Safe & Dispatcher-Safe Observable Collection for WPF

by Dean 1. February 2010 12:22

A common problem in WPF (& Silverlight) development is when you are working with multiple threads that need to change a collection that is a binding source and implements INotifyCollectionChanged.

Basically, the standard ObservableCollection<T> will only allow updates from the dispatcher thread, which means you need to write a lot of code for the worker threads to marshal changes onto the main message pump via the dispatcher. This can be a bit tedious, so I recently wrote a collection that performs all of the necessary marshalling internally, so users of this type do not have to be concerned about thread affinity issues.

Also, I decided to use a ReaderWriterLock to provide thread-safety during updates to the collection.

Here is my collection class:

 
public class SafeObservable<T> : IList<T>, INotifyCollectionChanged
{
    private IList<T> collection = new List<T>();
    private Dispatcher dispatcher;
    public event NotifyCollectionChangedEventHandler CollectionChanged;
    private ReaderWriterLock sync = new ReaderWriterLock();
 
    public SafeObservable()
    {
        dispatcher = Dispatcher.CurrentDispatcher;
    }
 
    public void Add(T item)
    {
        if (Thread.CurrentThread == dispatcher.Thread)
            DoAdd(item);
        else
            dispatcher.BeginInvoke((Action)(() => { DoAdd(item); }));
    }
 
    private void DoAdd(T item)
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        collection.Add(item);
        if (CollectionChanged != null)
            CollectionChanged(this,
                new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        sync.ReleaseWriterLock();
    }
 
    public void Clear()
    {
        if (Thread.CurrentThread == dispatcher.Thread)
            DoClear();
        else
            dispatcher.BeginInvoke((Action)(() => { DoClear(); }));
    }
 
    private void DoClear()
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        collection.Clear();
        if (CollectionChanged != null)
            CollectionChanged(this,
                new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        sync.ReleaseWriterLock();
    }
 
    public bool Contains(T item)
    {
        sync.AcquireReaderLock(Timeout.Infinite);
        var result = collection.Contains(item);
        sync.ReleaseReaderLock();
        return result;
    }
 
    public void CopyTo(T[] array, int arrayIndex)
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        collection.CopyTo(array, arrayIndex);
        sync.ReleaseWriterLock();
    }
 
    public int Count
    {
        get
        {
            sync.AcquireReaderLock(Timeout.Infinite);
            var result = collection.Count;
            sync.ReleaseReaderLock();
            return result;
        }
    }
 
    public bool IsReadOnly
    {
        get { return collection.IsReadOnly; }
    }
 
    public bool Remove(T item)
    {
        if (Thread.CurrentThread == dispatcher.Thread)
            return DoRemove(item);
        else
        {
            var op = dispatcher.BeginInvoke(new Func<T,bool>(DoRemove), item);
            if (op == null || op.Result == null)
                return false;
            return (bool)op.Result;
        }
    }
 
    private bool DoRemove(T item)
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        var index = collection.IndexOf(item);
        if (index == -1)
        {
            sync.ReleaseWriterLock();
            return false;
        }
        var result = collection.Remove(item);
        if (result && CollectionChanged != null)
            CollectionChanged(this, new
                NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        sync.ReleaseWriterLock();
        return result;
    }
 
    public IEnumerator<T> GetEnumerator()
    {
        return collection.GetEnumerator();
    }
 
    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return collection.GetEnumerator();
    }
 
    public int IndexOf(T item)
    {
        sync.AcquireReaderLock(Timeout.Infinite);
        var result = collection.IndexOf(item);
        sync.ReleaseReaderLock();
        return result;
    }
 
    public void Insert(int index, T item)
    {
        if (Thread.CurrentThread == dispatcher.Thread)
            DoInsert(index, item);
        else
            dispatcher.BeginInvoke((Action)(() => { DoInsert(index, item); }));
    }
 
    private void DoInsert(int index, T item)
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        collection.Insert(index, item);
        if (CollectionChanged != null)
            CollectionChanged(this,
                new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item, index));
        sync.ReleaseWriterLock();
    }
 
    public void RemoveAt(int index)
    {
        if (Thread.CurrentThread == dispatcher.Thread)
            DoRemoveAt(index);
        else
            dispatcher.BeginInvoke((Action)(() => { DoRemoveAt(index); }));
    }
 
    private void DoRemoveAt(int index)
    {
        sync.AcquireWriterLock(Timeout.Infinite);
        if (collection.Count == 0 || collection.Count <= index)
        {
            sync.ReleaseWriterLock();
            return;
        }
        collection.RemoveAt(index);
        if (CollectionChanged != null)
            CollectionChanged(this,
                new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        sync.ReleaseWriterLock();
 
    }
 
    public T this[int index]
    {
        get
        {
            sync.AcquireReaderLock(Timeout.Infinite);
            var result = collection[index];
            sync.ReleaseReaderLock();
            return result;
        }
        set
        {
            sync.AcquireWriterLock(Timeout.Infinite);
            if (collection.Count == 0 || collection.Count <= index)
            {
                sync.ReleaseWriterLock();
                return;
            }
            collection[index] = value;
            sync.ReleaseWriterLock();
        }
 
    }
}
 

To test the effectiveness of this collection class, I wrote a simple WPF app, that bound to the new collection class and updated it via multiple threads:

 

<Window x:Class="WpfApplication1.Window1"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    Title="Window1" Height="300" Width="300">
    <StackPanel Orientation="Vertical" VerticalAlignment="Top">
        <Button Content="Start" Click="Button_Click" />
        <ListView Name="list" ItemsSource="{Binding}" DisplayMemberPath="Text" />        
    </StackPanel>
</Window>

And the code behind is below:

 

public partial class Window1 : Window
{
    class TestData
    {
        public string Text { get; set; }
    }
 
    private Random rand = new Random(DateTime.Now.Millisecond);
    private SafeObservable<TestData> data = new SafeObservable<TestData>();
    public Window1()
    {
        InitializeComponent();
    }
 
    void Button_Click(object sender, RoutedEventArgs e)
    {
        list.DataContext = data;
        List<Action> work = new List<Action>();
        for (int i = 0; i < 100; i++)
        {
            work.Add(new Action(DoWorkAdd));
            work.Add(new Action(DoWorkClear));
            work.Add(new Action(DoWorkRemove));
            work.Add(new Action(DoWorkRemoveAt));
            work.Add(new Action(DoWorkInsert));
            work.Add(new Action(DoWorkReplace));
        }
        for (int i = 0; i < 1000; i++)
            work[rand.Next(0, work.Count)].BeginInvoke(null, null);
 
    }
 
    void DoWorkAdd()
    {
        Thread.Sleep(rand.Next(500, 30000));
        data.Add(new TestData() { Text = string.Format("Thread {0} Added", Thread.CurrentThread.ManagedThreadId) });
    }
 
    void DoWorkClear()
    {
        Thread.Sleep(rand.Next(500, 10000));
        data.Clear();
        Debug.WriteLine((string.Format("Thread {0} Clear", Thread.CurrentThread.ManagedThreadId)));
    }
 
    void DoWorkRemove()
    {
        Thread.Sleep(rand.Next(500, 10000));
        if (data.Count == 0)
            return;
        var item = data[0];
        data.Remove(item);
        Debug.WriteLine((string.Format("Thread {0} Remove", Thread.CurrentThread.ManagedThreadId)));
    }
 
    void DoWorkRemoveAt()
    {
        Thread.Sleep(rand.Next(500, 10000));
        if (data.Count == 0)
            return;
        data.RemoveAt(0);
        Debug.WriteLine((string.Format("Thread {0} RemoveAt", Thread.CurrentThread.ManagedThreadId)));
    }
 
    void DoWorkInsert()
    {
        Thread.Sleep(rand.Next(500, 10000));
        data.Insert(rand.Next(0, data.Count), new TestData() 
            { Text = string.Format("Thread {0} Insert", Thread.CurrentThread.ManagedThreadId) });
    }
 
    void DoWorkReplace()
    {
        Thread.Sleep(rand.Next(500, 10000));
        data[rand.Next(0, data.Count)] = new TestData() 
            { Text = string.Format("Thread {0} Replace", Thread.CurrentThread.ManagedThreadId) };
    }
 
}

All my WPF app does is run a number of random actions against the collection from a variety of threads.

NOTE:  When removing items from the collection I used the Refresh action of NotifyCollectionChangedAction instead of Remove. This is because the remove action doesnt work correctly in a multi-threaded scenario when used as a binding source for a list control in WPF.

If anyone has any siggestions or enhancements, please let me know

Dean

Tags:

DataBinding | C# | Threading

RecentComments

Comment RSS
Disclaimer
The opinions expressed herein are my own personal opinions and do not represent my employer's view in anyway.

© Copyright 2012 Dean Chalk's Blog