How to reimplement the Where () operator for PLINQ?

I would like to rewrite one of my extensions LINQto take advantage of parallelism. In any case, I do not know where to start.

As an example of training, I would like to know how to rewrite operator implementation Where(), but it works on ParallelQuery.

public static ParallelQuery<TSource> Where<TSource>(
   this ParallelQuery<TSource> source, 
   Func<TSource, bool> predicate)
{
    //implementation
}

One could write:

someList.AsParallel().Where(...)

The notation a Where, which is executed in sequential order, is trivial:

public static IEnumerable<TSource> Where<TSource>( 
    this IEnumerable<TSource> source, 
    Func<TSource, bool> predicate) 
{ 
    foreach (TSource item in source) 
    { 
        if (predicate(item)) 
        { 
            yield return item; 
        } 
    } 
}

I meant just to wrap the predicate around Parallel.ForEach()(and by clicking the results in List / Array), but I don't think this is the way to go.

, ( SO), . , , , . , , , , , - , , , ( , , )


, LINQ, :

public static IEnumerable<TSource> WhereContains<TSource, TKey>(
     this IEnumerable<TSource> source, 
     IEnumerable<TKey> values,
     Func<TSource, TKey> keySelector)
{
    HashSet<TKey> elements = new HashSet<TKey>(values);

    foreach (TSource item in source)
    {
        if (elements.Contains(keySelector(item)))
        {
            yield return item;
        }
    }
}
+4
2

, ParallelQuery<T> - , ParallelQuery<T> , .

, PLINQ, , . , , Where Contains, ... .

public static ParallelQuery<TSource> WhereContains<TSource, TKey>(
    this ParallelQuery<TSource> source,
    IEnumerable<TKey> values,
    Func<TSource, TKey> keySelector)
{
    HashSet<TKey> elements = new HashSet<TKey>(values);

    return source.Where(item => elements.Contains(keySelector(item)));
}

Where ( ) Contains , , , HashSet , , .


, , , , .

class Program
{
    static void Main(string[] args)
    {
        List<int> items = new List<int>(Enumerable.Range(0,100));

        int[] values = {5, 12, 25, 17, 0};

        Console.WriteLine("thread: {0}", Environment.CurrentManagedThreadId);

        var result = items.AsParallel().WhereContains(values, x=>x).ToList();

        Console.WriteLine("Done");
        Console.ReadLine();
    }
}

static class Extensions
{
    public static ParallelQuery<TSource> WhereContains<TSource, TKey>(
        this ParallelQuery<TSource> source,
        IEnumerable<TKey> values,
        Func<TSource, TKey> keySelector)
    {
        HashSet<TKey> elements = new HashSet<TKey>(values);

        return source.Where(item =>
        {
            Console.WriteLine("item:{0} thread: {1}", item, Environment.CurrentManagedThreadId);
            return elements.Contains(keySelector(item));
        });
    }
}
+1

?

public static ParallelQuery<TSource> Where<TSource>(
    this ParallelQuery<TSource> source, 
    Func<TSource, bool> predicate)
{
    return
        source
            .SelectMany(x =>
                predicate(x)
                ? new TSource[] { x } 
                : Enumerable.Empty<TSource>());
}
+1

All Articles