What is a good network-based I / O streaming processing method?

Note. Let me come up with the length of this question, I had to add a lot of information to it. I hope that this does not make too many people simply abandon it and make assumptions. Please read in full. Thanks.

I have a stream of data coming in through a socket. This data is row oriented.

I am using APM (Async programming method) .NET (BeginRead, etc.). This eliminates the use of thread-based I / O, as Async I / O is buffer-based. You can repackage the data and send it to a stream, such as a memory stream, but there are problems there too.

The problem is that my input stream (which I do not control) does not give me any information about how long the stream is. This is just a stream of newlines, looking like this:

COMMAND\n ...Unpredictable number of lines of data...\n END COMMAND\n ....repeat.... 

Thus, using APM, and since I don’t know how long this data set will work, it is likely that data blocks will cross buffer boundaries requiring multiple reads, but these multiple reads will also span multiple data blocks.

Example:

 Byte buffer[1024] = ".................blah\nThis is another l" [another read] "ine\n.............................More Lines..." 

My first thought was to use StringBuilder and just add buffer lines to SB. This works to some extent, but it was difficult for me to extract data blocks. I tried using StringReader to read new data, but there was no way to find out if the full string was successful or not, since StringReader returns a partial string at the end of the last block added, and then returns null references. There is no way to find out if the resulting complete full row of data was received.

Example:

 // Note: no newline at the end StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); StringReader sr = new StringReader(sb); string s = sr.ReadLine(); // returns "This is a line" s = sr.ReadLine(); // returns "This is incomp.." 

Worse, if I just keep adding data, the buffers are getting bigger and bigger, and since it can work for weeks or months at a time, this is not a good solution.

My next thought was to remove data blocks from SB when I read them. This required writing my own ReadLine function, but then I got stuck locking the data while reading and writing. In addition, large blocks of data (which may consist of hundreds of reads and megabytes of data) require scanning the entire buffer looking for new lines. It is inefficient and rather ugly.

I am looking for something that has the simplicity of a StreamReader / Writer with the convenience of async I / O.

My next thought was to use a MemoryStream and write data blocks to a memory stream, and then attach the StreamReader to the stream and use ReadLine, but again I have problems understanding if the last read in the buffer is a complete line or not, plus it’s even harder to remove the "obsolete" data from the stream.

I also thought about using a thread with synchronous reads. This has the advantage that with StreamReader it will always return the full line from ReadLine (), except in the event of a failure. However, this has problems with disconnecting the connection, and some types of network problems can cause blocking sockets to hang for a long period of time. I use async IO because I do not want to bind a thread during the life of a program that blocks data reception.

The connection is long. And the data will continue to flow over time. During an internal connection, a large stream of data occurs, and as soon as this stream is executed, the socket remains open, waiting for updates in real time. I do not know exactly when the initial stream "finished", as the only way to find out that more data is not sent immediately. This means that I can’t wait for the initial loading of data to complete before processing, I pretty much loop in real-time processing when it arrives.

So, can anyone suggest a good method to handle this situation so that it is not overly complicated? I really want it to be as simple and elegant as possible, but I continue to come up with ever more complex solutions because of all the extreme cases. I suppose that I want this is some kind of FIFO in which I can easily add additional data and at the same time output data from it that meet certain criteria (i.e. Lines with a terminating string character).

+6
c # asynchronous network-programming
source share
2 answers

This is a rather interesting question. The solution for me in the past was to use a separate thread with synchronous operations, as you suggest. (I managed to get around most of the problems with blocking sockets using locks and a large number of exception handlers.) However, using built-in asynchronous operations is usually advisable, since it allows you to use the true I / O level at the OS level, so I understand your point.

Well, I went and wrote a class to accomplish what you think you need (in a relatively clean manner, I would say). Let me know what you think.

 using System; using System.Collections.Generic; using System.IO; using System.Text; public class AsyncStreamProcessor : IDisposable { protected StringBuilder _buffer; // Buffer for unprocessed data. private bool _isDisposed = false; // True if object has been disposed public AsyncStreamProcessor() { _buffer = null; } public IEnumerable<string> Process(byte[] newData) { // Note: replace the following encoding method with whatever you are reading. // The trick here is to add an extra line break to the new data so that the algorithm recognises // a single line break at the end of the new data. using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) { // Read all lines from new data, returning all but the last. // The last line is guaranteed to be incomplete (or possibly complete except for the line break, // which will be processed with the next packet of data). string line, prevLine = null; while ((line = newDataReader.ReadLine()) != null) { if (prevLine != null) { yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; _buffer = null; } prevLine = line; } // Store last incomplete line in buffer. if (_buffer == null) // Note: the (* 2) gives you the prediction of the length of the incomplete line, // so that the buffer does not have to be expanded in most/all situations. // Change it to whatever seems appropiate. _buffer = new StringBuilder(prevLine, prevLine.Length * 2); else _buffer.Append(prevLine); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (!_isDisposed) { if (disposing) { // Dispose managed resources. _buffer = null; GC.Collect(); } // Dispose native resources. // Remember that object has been disposed. _isDisposed = true; } } } 

An instance of this class must be created for each NetworkStream, and the Process function must be called whenever new data is received (in the callback method for BeginRead, before you call the next BeginRead, I would assume).

Note. I checked this code only with test data, and not the actual data transmitted over the network. However, I would not expect any differences ...

In addition, the warning that the class is, of course, is not thread safe, but until the BeginRead is executed again until the current data has been processed (I suppose you do), there should not be any problems.

Hope this works for you. Let me know if there are any remaining problems, and I will try to change the solution to deal with them. (There may be some subtlety of the question that I missed, despite the fact that I carefully read it!)

+5
source share

What you explain in your question is very similar to ASCIZ lines. ( link text ). This may be a useful start.

I had to write something similar in college for a project I was working on. Unfortunately, I had control over the sending socket, so I inserted the length of the message field as part of the protocol. However, I think that such an approach can benefit you.

As I approached my solution, I would send something like 5HELLO, so first I saw 5, and I know that I have a message length of 5, and so the message I needed was 5 characters. However, if in my asynchronous reading I received only 5HE, I would see that I have a message length of 5, but I was able to read only 3 bytes (let ASCII characters be assumed). Because of this, I knew that I was missing a few bytes, and saved what I had in the fragment buffer. I had one fragment buffer per socket to avoid any synchronization problems. A rough process.

  • Reading from a socket to an array of bytes, writing the number of bytes read.
  • Scan bytes by bytes until you find a newline character (it becomes very difficult if you don't get ascii characters, but characters that can be multiple bytes are your own)
  • Turn the frag buffer into a line and add a read buffer to a new line. Drop this line as a completed message in the queue or its own delegate for processing. (you can optimize these buffers, in fact, if you read the socket entry into the same byte array as the fragment, but this is harder to explain)
  • Continue the cycle, every time we find a new line, create a line from the byte arrangement from the recorded start / end position and go to the queue / delegate for processing.
  • As soon as we delete the end of our read buffer, copy everything that remains in the fragment buffer.
  • Call BeginRead on the socket, which will go to step 1. when the data is available on the socket.

Then you use a different thread to find out that you are a message queue with violations, or just let Threadpool handle it with delegates. And do whatever data processing you need to do. Someone will correct me if I am wrong, but there are very few problems with thread synchronization, since you can read or wait only to read from the socket at any time, so do not worry about locks (unless you are filling the queue, I used delegates in my implementation). There are several details that you will need to work out on your own, for example, how to increase the fragment buffer, if you get 0 new lines when reading, the entire message should be added to the fragment buffer without overwriting anything. I think that in the end I managed to run about 700 to 800 lines of code, but that included connection setup stuff, negotiation for encryption, and a few other things.

This setting worked very well for me; I was able to run up to 80 Mbps on 100 Mbps Ethernet-LAN ​​using this implementation, 1.8Ghz opteron, including encryption processing. And since you are attached to a socket, the server will scale, since you can work with several sockets at the same time. If you need items processed in order, you will need to use a queue, but if the order does not matter, then the delegates will give you a very scalable performance from the stream.

I hope this helps, and does not mean a complete solution, but the direction in which to start looking.

* Just a note: my implementation was only at the byte level and supported encryption, I used characters for my example to simplify the visualization.

0
source share

All Articles