API data import for thousands of users using streams

In our application, we need to import transaction data from PayPal through the API for users of my application and store it in the database. I have thousands (about 5k now) of users, and it is growing every day.

This application is a service of Windows.net.

It imports data on an hourly basis for all users. Currently, we import user data one user after another, but sometimes what happens can be so large that the user data is so large that it took about 5 hours to get all of his data, so we block other users. until this import of user data is complete. This hourly import for all other users has completely gone behind the cast.

To avoid this, we thought about creating threads for each user and start them every hour using the Windows service. Here we have a situation where we need to think about bandwidth at any given time, since all threads will start at the same time. Is this a problem at all?

Now, I want to know if our new implementation is correct or not? Also I want to know how this is usually done? If someone has encountered such functionality, please tell us how to do it.

If my question is not clear enough, please let me know, I will provide additional information.

Change If I send so many requests to Paypal from one IP address, how does it handle it? Is there an idea to limit IP requests?

Update: Thanks for all the suggestions and feedback.

I was thinking about using jgauffin solution, as it was a perfect ThreadPool simulation. But here I need a few more functions, such as changing the flow restriction dynamically and recursively calling the callback method.

After much research and analysis of the thread pool, I decided to use SmartThreadPool , which is based on threadpool logic, but with more features. This is not bad and perfectly fulfills my goal.

+4
source share
5 answers

I would use a queue and say five threads for this. Each time a thread completes, it will receive a new user from the queue.

Code example:

public class Example { public static void Main(string[] argv) { //setup DownloadQueue personQueue = new DownloadQueue(); personQueue.JobTriggered += OnHandlePerson; personQueue.ThreadLimit = 10; //can be changed at any time and will be adjusted when a job completed (or a new one is enqueued) // enqueue as many persons as you like personQueue.Enqueue(new Person()); Console.ReadLine(); } public static void OnHandlePerson(object source, PersonEventArgs e) { //download persno here. } } public class DownloadQueue { Queue<Person> _queue = new Queue<Person>(); int _runningThreads = 0; public int ThreadLimit { get; set; } /// <summary> /// Enqueue a new user. /// </summary> /// <param name="person"></param> public void Enqueue(Person person) { lock (_queue) { _queue.Enqueue(person); if (_runningThreads < ThreadLimit) ThreadPool.QueueUserWorkItem(DownloadUser); } } /// <summary> /// Running using a ThreadPool thread. /// </summary> /// <param name="state"></param> private void DownloadUser(object state) { lock (_queue) ++_runningThreads; while (true) { Person person; lock (_queue) { if (_queue.Count == 0) { --_runningThreads; return; // nothing more in the queue. Lets exit } person = _queue.Dequeue(); } JobTriggered(this, new PersonEventArgs(person)); } } public event EventHandler<PersonEventArgs> JobTriggered = delegate { }; } public class PersonEventArgs : EventArgs { Person _person; public PersonEventArgs(Person person) { _person = person; } public Person Person { get { return _person; } } } public class Person { public Person(string fName, string lName) { this.firstName = fName; this.lastName = lName; } public string firstName; public string lastName; } 
+1
source

Do not use a stream for each user. Insert the WORK pool into the thread pool for each user. Thus, you have the best of both worlds, and not the overhead of 5000 threads and more load control, since you can determine how many threads ThreadPool uses to work with work items.

+2
source

What I would do begins with a thread pool (say 10), and let each thread import. When this is done, it will take the next item from the queue. You use the existing ThreadPool class and put all your import requests in this thread. You can control the maximum number of threads for this ThreadPool.

Creating thousands of threads is a bad idea for several reasons, it is too much for Windows, and, as you yourself specify, you can flood the network (or, possibly, PayPal service).

For extreme scalability, you can do asynchronous I / O that does not block the flow during the execution of the request, but this API has a steep learning curve and is probably not needed for your scenario.

+2
source

Creating 5000 threads in the code is not very good, it can slow down the server by a huge amount, even if it can lead to its collapse.

What you need is load balancing.

try thinking of a solution based on MSMQ if you are on .net plateform and quequ user requests, and then there should be some distribution that will distribute the user request between the servers.

+1
source

I would avoid creating a thread for each user. This approach is not very scalable. And I assume that the API does not have a mechanism for asynchronous loading. If so, then this is probably the way to go.

The manufacturer-consumer model may work well here. The idea is to create a pool of fixed thread sizes that consume work items from a common queue. It is probably best to avoid ThreadPool in your case, because it is intended primarily for short-term tasks. You do not want your long-term tasks to exhaust it, because it is used for different things in .NET BCL.

If you are using .NET 4.0, you can use BlockingCollection . There is also a backport available as part of Reactive Extensions . Here is what your code looks like.

Note. You will have to pin the code to make it more reliable, elegantly turned off, etc. by yourself.

 public class Importer { private BlockingCollection<Person> m_Queue = new BlockingCollection<Person>(); public Importer(int poolSize) { for (int i = 0; i < poolSize; i++) { var thread = new Thread(Download); thread.IsBackground = true; thread.Start(); } } public void Add(Person person) { m_Queue.Add(person); } private void Download() { while (true) { Person person = m_Queue.Take(); // Add your code for downloading this person data here. } } } 
+1
source

All Articles