Persistent HTTP streaming with Indy

I have a JSON-RPC service that for one of the requests returns a continuous stream of JSON objects.

those.

{id:'1'} {id:'2'} //30 minutes of no data {id:'3'} //... 

Of course there is no Content-Length, because the stream is infinite.

I use my own TStream descendant to receive and analyze data. But internally, TIdHttp buffers the data and doesn't pass it to me until the RecvBufferSize bytes are RecvBufferSize .

This leads to:

 {id:'1'} //received {id:'2'} //buffered by Indy but not received //30 minutes of no data {id:'3'} //this is where Indy commits {id:'2'} to me 

Obviously, this will not happen, because a message that mattered 30 minutes ago should have been delivered 30 minutes ago.

I would like Indy to do exactly what sockets do: read up to RecvBufferSize or less if there is data and returns immediately.

I found this discussion since 2005, when some poor soul tried to explain the problem to the Indy developers, but they did not understand it. (Read, this is a sad sight)

In any case, he worked on this by writing a custom descendant of IOHandler, but that was back in 2005, maybe there are ready-made solutions today?

+6
source share
4 answers

When using a TCP stream there was an option, in the end I went with an original solution for writing a custom TIdIOHandlerStack descendant.

The motivation was that with TIdHTTP I know that it does not work, and I just need to fix it, but when switching to a lower TCP level, new problems arise.

Here's the code I'm using , and I'm going to discuss key points here.

The new TIdStreamIoHandler should inherit from TIdIOHandlerStack .

Two functions need to be rewritten: ReadBytes and ReadStream :

 function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True): integer; virtual; procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; AReadUntilDisconnect: Boolean = False); override; 

Both are modified Indy functions that can be found in IdIOHandler.TIdIOHandler . In ReadBytes , the while clause should be replaced with a singe ReadFromSource() request, so that TryReadBytes returned after reading up to AByteCount bytes at a time.

Based on this, ReadStream should process all combinations of AByteCount (> 0, <0) and ReadUntilDisconnect (true, false) for cyclic reading, and then write to the stream pieces of data coming from the socket.

Please note that the ReadStream should not end prematurely even in this version of the stream if the juice contains only a part of the requested data. You just need to write this part to the stream instantly, and not cache it in the FInputBuffer , and then block and wait for the next part of the data.

+2
source

Sounds to me like a WebSocket task, since your connection is not a simple HTTP question / answer anymore, but a stream of content.

Check out the WebSocket server versions for Delphi for some code.

There is at least one based on Indy , by AsmProfiler.

AFAIK there are two kinds of flow in websockets: binary and text. I suspect your JSON stream is text content, from a websocket perspective.

Another option is to use long-pooling or some older protocols that are more convenient for rooter - when the connection switch is in websockets mode, it is no longer standard HTTP, so some "reasonable" batch verification tools (on a corporate network) can identify it as a security attack (for example, DoS), so they can disconnect.

+4
source

You do not need to write an IOHandler descendant, this is already possible with the TIdTCPClient class. It provides a TIdIOHandler object that has methods for reading from a socket. These ReadXXX methods are blocked until the requested data is read or a timeout occurs. As long as the connection exists, ReadXXX can be executed in a loop and whenever it receives a new JSON object, pass it to the application logic.

Your example looks like all JSON objects have only one line. However, JSON objects can be multi-line, in which case client code must know how they are separated.


Update: in a similar Stackoverflow question (for .Net) for the "streaming" HTTP JSON HTTP service, the most popular solution used a lower-level TCP client instead of an HTTP client: Reading data from an open HTTP stream

+2
source

In fact, there is length data directly in front of the contents of the packet, which is transmitted in transmission mode with alternating encoding. Using data of this length, IOhandler idhttp reads one packet in one stream. The smallest significant unit is a package, so there should be no need to read characters one by one from the package, and then there is no need to change the IOHandler functions. The only problem is that idhttp will not stop the rotation of the stream data to the next step due to the infinite stream data: there is no final packet. Thus, the solution uses the idhttp onwork event to start reading from the stream and setting the position of the stream to zero to avoid overflow. For instance:

  //add a event handler to idhttp IdHTTP.OnWork := IdHTTPWork; procedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); begin ..... ResponseStringStream.Position :=0; s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten ResponseStringStream.Clear; ... end; procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject); var begin ..... source := RatesWorker.RatesURL+'EUR_USD'; RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); end; 

However, using the custom write () function of Tstream may be the best solution for this kind of requirement.

0
source

All Articles