Real-time iteration application architecture

Sorry for the abstract question, but I'm looking for some examples / tips / articles about the type of applications that perform some equivalent operations in a loop, and each iteration of the loop should expose its result in a certain part of the time (for example, 10 seconds).

My application synchronizes data between an external WCF service and a local database. At each iteration, the application fetches the changes to the data transfer request to the WCF service and puts the changes into the database and vice versa. One of the most difficult requirements for this application is that iterations should fire every ten seconds.

So here are the problems. How can I guarantee that the iteration will end in no more than 10 seconds?

I assume that this type of application is called real-time applications (real-time OS).

The DAL components that we use randomly affect the connection timeout behavior. Thus, DB operations can take longer than 10 seconds.

Here is a sample code for one iteration:

Stopwatch s1 = new Stopwatch(); s1.Start(); Parallel.ForEach(Global.config.databases, new ParallelOptions { MaxDegreeOfParallelism = -1 }, (l) => { Console.WriteLine("Started for {0}", l.key.name); DB db = new DB(l.connectionString); DateTime lastIterationTS = GetPreviousIterationTS(l.id); ExternalService serv = new ExternalService(l.id); List<ChangedData> ChangedDataDb = db.GetChangedData(DateTime.Now.AddSeconds((lastIterationTS == DateTime.MinValue) ? -300 : -1 * (DateTime.Now - lastIterationTS).Seconds)); List<Data> ChangedDataService = serv.GetModifiedData(); Action syncDBChanges = new Action(() => { //    foreach (ChangedData d in ChangedDataDb) { try { // ... // analyzing & syncing } catch (Exception e) { logger.InfoEx("Exception_SyncDatabase", e.ToString()); } } } ); Action syncService = new Action(() => { foreach (Data d in ChangedDataService) { try { // ... // analyzing & syncing } catch (Exception e) { logger.InfoEx("Exception_SyncService", e.ToString()); } } }); List<WaitHandle> handles = new List<WaitHandle>(); IAsyncResult ar1 = syncDBChanges.BeginInvoke(syncDBChanges.EndInvoke, null); IAsyncResult ar2 = syncService.BeginInvoke(syncService.EndInvoke, null); handles.Add(ar1.AsyncWaitHandle); handles.Add(ar2.AsyncWaitHandle); WaitHandle.WaitAll(handles.ToArray(), (int)((Global.config.syncModifiedInterval - 1) * 1000)); SetCurrentIterationTS(l.id); } catch (Exception e) { Console.WriteLine(e.Message); logger.InfoEx("Exception_Iteration", e.ToString()); continue; } } logger.InfoEx("end_Iteration", IterationContextParams); } ); s1.Stop(); Console.WriteLine("Main iteration done for {0}...", s1.Elapsed); 
+1
c # database wcf sync
source share
5 answers

You can consider several options ...

  • Kill an iteration if it exceeds more than 10 seconds, and I hope that the next iteration will complete the process. The problem with this approach is that there is a good chance that none of the iterations will end, and so the synchronization process will never happen. I would recommend the following option ...

  • If the iteration takes more than 10 seconds, wait for it to complete and skip the next iteration (s). This way you guarantee that the process will be completed at least once. Below is a simplified sample code for reference ...

     class Updater { Timer timer = new Timer(); public object StateLock = new object(); public string State; public Updater() { timer.Elapsed += timer_Elapsed; timer.Interval = 10000; timer.AutoReset = true; timer.Start(); } void timer_Elapsed(object sender, ElapsedEventArgs e) { if (State != "Running") { Process(); } } private void Process() { try { lock (StateLock) { State = "Running"; } // Process lock (StateLock) { State = ""; } } catch { throw; } } } 

...

 class Program { static void Main(string[] args) { Updater updater = new Updater(); Console.ReadLine(); } } 
+2
source share

Quartz.net is a great planner for the .NET platform, which I think can meet your needs.

  • If you want to kill a quest, you can implement IInterruptableJob . You must add the cleanup code in the Interupt method to remove any db connections.
  • If you want to finish the job, but just start another job, if the latter is completed (which I consider the best option), you can implement IStatefulJob
+1
source share

I usually separate the update cycle from the actual timer

The timer has two functions:

1) if the update is not running.

2) if the service is already running, set the flag to continue.

Update cycle:

1) set the execution flag

2) upgrade

3) set the execution flag to false

4) if the continuation mode is set, go to 1).

0
source share

You might want to read the variety of Timer objects available on .Net: http://msdn.microsoft.com/en-us/magazine/cc164015.aspx

I personally like System.Threading.Timer because you can easily use lambdas, and it allows you to pass a state object if you create a separate callback.

I would also recommend using the System.Threading.Tasks library, because it allows you to gracefully handle the cancellation if the timer expires before your work is completed. Msdn example: http://msdn.microsoft.com/en-us/library/dd537607.aspx

Here is an example of using these components together in a 10-minute timer: Note. To do this with your sql database, you need to set Asynchronous Processing=true; and MultipleActiveResultSets=True;

 CancellationTokenSource cancelSource = new CancellationTokenSource(); System.Threading.Timer timer = new System.Threading.Timer(callback => { //start sync Task syncTask = Task.Factory.StartNew(syncAction => { using (SqlConnection conn = new SqlConnection( ConfigurationManager.ConnectionStrings["db"].ConnectionString)) { conn.Open(); using (SqlCommand syncCommand = new SqlCommand { CommandText = "SELECT getdate() \n WAITFOR DELAY '00:11'; ", CommandTimeout = 600, Transaction = conn.BeginTransaction(), Connection = conn }) { try { IAsyncResult t = syncCommand.BeginExecuteNonQuery(); SpinWait.SpinUntil(() => (t.IsCompleted || cancelSource.Token.IsCancellationRequested)); if (cancelSource.Token.IsCancellationRequested && !t.IsCompleted) syncCommand.Transaction.Rollback(); } catch (TimeoutException timeoutException) { syncCommand.Transaction.Rollback(); //log a failed sync attepmt here Console.WriteLine(timeoutException.ToString()); } finally { syncCommand.Connection.Close(); } } } }, null, cancelSource.Token); //set up a timer for processing in the interim, save some time for rollback System.Threading.Timer spinTimer = new System.Threading.Timer(c => { cancelSource.Cancel(); }, null, TimeSpan.FromMinutes(9), TimeSpan.FromSeconds(5)); //spin here until the spintimer elapses; //this is optional, but would be useful for debugging. SpinWait.SpinUntil(()=>(syncTask.IsCompleted || cancelSource.Token.IsCancellationRequested)); if (syncTask.IsCompleted || cancelSource.Token.IsCancellationRequested) spinTimer.Dispose(); }, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(10)); 
0
source share

Perhaps try this. Make sure that you are not creating or using any new threads in the DoWork () method.

 class DatabaseUpdater { private readonly Timer _timer; private List<Thread> _threads; private readonly List<DatabaseConfig> _dbConfigs; public DatabaseUpdater(int seconds, List<DatabaseConfig> dbConfigs) { _timer = new Timer(seconds * 1000); _timer.Elapsed += TimerElapsed; _dbConfigs = dbConfigs; } public void Start() { StartThreads(); _timer.Start(); } public void Stop() { _timer.Stop(); StopThreads(); } void TimerElapsed(object sender, ElapsedEventArgs e) { StopThreads(); StartThreads(); } private void StartThreads() { var newThreads = new List<Thread>(); foreach (var config in _dbConfigs) { var thread = new Thread(DoWork); thread.Start(config); newThreads.Add(thread); } _threads = newThreads; } private void StopThreads() { if (_threads == null) return; var oldThreads = _threads; foreach (var thread in oldThreads) { thread.Abort(); } } static void DoWork(object objConfig) { var dbConfig = objConfig as DatabaseConfig; if (null == dbConfig) return; var n = GetRandomNumber(); try { ConsoleWriteLine("Sync started for : {0} - {1} sec work.", dbConfig.Id, n); // update/sync db Thread.Sleep(1000 * n); ConsoleWriteLine("Sync finished for : {0} - {1} sec work.", dbConfig.Id, n); } catch (Exception ex) { // cancel/rollback db transaction ConsoleWriteLine("Sync cancelled for : {0} - {1} sec work.", dbConfig.Id, n); } } static readonly Random Random = new Random(); [MethodImpl(MethodImplOptions.Synchronized)] static int GetRandomNumber() { return Random.Next(5, 20); } [MethodImpl(MethodImplOptions.Synchronized)] static void ConsoleWriteLine(string format, params object[] arg) { Console.WriteLine(format, arg); } } static void Main(string[] args) { var configs = new List<DatabaseConfig>(); for (var i = 1; i <= 3; i++) { configs.Add(new DatabaseConfig() { Id = i }); } var databaseUpdater = new DatabaseUpdater(10, configs); databaseUpdater.Start(); Console.ReadKey(); databaseUpdater.Stop(); } 
0
source share

All Articles