Memory problem in TPL IO IO operation data stream implementation

I tried to implement a write read operation using File IO operations and encapsulated these operations in a TransformBlock to make these operations threads safe instead of using a locking mechanism.

But the problem is that when I try to write even 5 files in parallel, there is memory from the exception and when using this implementation it blocks the user interface stream. Implementation is carried out in the Windows Phone project. Please imagine what is wrong with this incarnation.

File I / O operation

 public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication(); public static readonly FileIO _file = new FileIO(); public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow = new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ExclusiveScheduler, BoundedCapacity = 1 }; public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow = new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ConcurrentScheduler, BoundedCapacity = 1 }; public static async Task<T> LoadAsync<T>(string fileName) { T result = default(T); var transBlock = new TransformBlock<string, T> (async fName => { return await LoadData<T>(fName); }, concurrentExecutionDataFlow); transBlock.Post(fileName); result = await transBlock.ReceiveAsync(); return result; } public static async Task SaveAsync<T>(T obj, string fileName) { var transBlock = new TransformBlock<Tuple<T, string>, Task> (async tupleData => { await SaveData(tupleData.Item1, tupleData.Item2); }, exclusiveExecutionDataFlow); transBlock.Post(new Tuple<T, string>(obj, fileName)); await transBlock.ReceiveAsync(); } 

MainPage.xaml.cs Usage

 private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd" private static string fileName = string.Empty; private List<string> DataLstSample = new List<string>(); private ObservableCollection<string> TestResults = new ObservableCollection<string>(); private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh"; List<Task> allTsk = new List<Task>(); private Random rand = new Random(); private string fileNameRand { get { return rand.Next(100).ToString(); } } public MainPage() { InitializeComponent(); for (int i = 0; i < 5; i ++) { DataLstSample.Add((i % 2) == 0 ? data : data1); } } private void Button_Click(object sender, RoutedEventArgs e) { AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual(); } public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual() { TstRst.Text = "InProgress.."; allTsk.Clear(); foreach(var data in DataLstSample) { var fName = fileNameRand; var t = Task.Run(async () => { await AppIsolatedStore.SaveAsync<string>(data, fName); }); TestResults.Add(string.Format("Writing file name: {0}, data: {1}", fName, data)); allTsk.Add(t); } await Task.WhenAll(allTsk); TstRst.Text = "Completed.."; } 

Save and load Async data

  /// <summary> /// Load object from file /// </summary> private static async Task<T> LoadData<T>(string fileName) { T result = default(T); try { if (!string.IsNullOrWhiteSpace(fileName)) { using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore)) { var data = await _file.ReadTextAsync(file); if (!string.IsNullOrWhiteSpace(data)) { result = JsonConvert.DeserializeObject<T>(data); } } } } catch (Exception ex) { //todo: log the megatron exception in a file Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : {0}", ex.Message); } finally { } return result; } /// <summary> /// Save object from file /// </summary> private static async Task SaveData<T>(T obj, string fileName) { try { if (obj != null && !string.IsNullOrWhiteSpace(fileName)) { //Serialize object with JSON or XML serializer string storageString = JsonConvert.SerializeObject(obj); if (!string.IsNullOrWhiteSpace(storageString)) { //Write content to file await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString); } } } catch (Exception ex) { //todo: log the megatron exception in a file Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : {0}", ex.Message); } finally { } } 

Edit:

The reason it has a memory exception is due to one reason that the data row I took is too long. The string is a link: http://1drv.ms/1QWSAsc

But the second problem is that if I add small data, then it blocks the UI thread. Is the code any task to run the UI?

+6
source share
2 answers

No, you are using a parallel pair that uses the default thread pool for it, and you create tasks using the Run method, so there is no problem here. But the code that you have here has two main threats:

 var transBlock = new TransformBlock<string, T> (async fName => { // process file here }, concurrentExecutionDataFlow); 

You really should not create transBlock every time. The main idea of TPL Dataflow is that you create blocks once and use them after that. Therefore, you must reorganize your application to reduce the number of blocks you create, otherwise it is not. TPL Dataflow should be used.

Another threat in your code is that you are explicitly blocking the thread!

 // Right here await Task.WhenAll(allTsk); TstRst.Text = "Completed.."; 

Calling await for a task from the async void method from the synchronous event handler blocks the thread, since by default it captures the synchronization context . First of all, async void should be avoided. Secondly, if you are asynchronous, you must be completely asynchronous , so the event handler must also be asynchronous. Thirdly, you can use the continuation for your task to update your interface or use the current synchronization context .

So your code should look something like this:

 // store the sync context in the field of your form SynchronizationContext syncContext = SynchronizationContext.Current; // avoid the async void :) public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual() // make event handler async - this is the only exception for the async void use rule from above private async void Button_Click(object sender, RoutedEventArgs e) // asynchronically wait the result without capturing the context await Task.WhenAll(allTsk).ContinueWith( t => { // you can move out this logic to main method syncContext.Post(new SendOrPostCallback(o => { TstRst.Text = "Completed.."; })); } ); 
+1
source

Have you tried playing with the BoundedCapacity parameter in ExecutionDataflowBlockOptions? An introduction to TPL mentions block volume:

[...] limitation is useful in network data flow to avoid unlimited memory growth. This can be very important for reliability if theres the possibility that producers can generate data faster than consumers can process it ...

Id suggests trying this option, it limits the processing queue of processed items and finds out if it helps with problems with your memory.

0
source

All Articles