Design ideas for shared flow users?

Assume the following scenario:

We have a Java class from some library that consumes a stream of bytes, say, the XML parser XmlParser1 , which provides the xmlParser1.parse(inputStream) method; the method, as it is typical, eats all bytes in one call, eventually blocking. We also have another class from another library that does something similar, with a different implementation: XmlParser2 with xmlParser2.parse(inputStream) . Now we want to parse one stream with both parsers.

My first answer would be: we are screwed. Since we cannot control how each class consumes a stream, all we can do is buffer all bytes in memory or a temporary file (or, if possible, open or reopen it). The API of these consumers is inherently incompatible.

Now suppose we have control over XmlParser1 (implementation and signature), and we want to encode it in a more flexible and collaborative way so that the caller can implement the above behavior in some sensible and efficient way ... what do you suggest?

Some alternatives that I have been considering:

1) XmlParser1 implementation of FilterInputStream , so that when some class ( XmlParser1 ) tries to read some bytes from it, it internally parses what it needs (iteratively, possibly with some reasonable buffering), and also returns the raw bytes. (This is not consistent with the concept of FilterInputStream , I would say). Thus, client code can simply chain parsers:

  public class XmlParser1 extends FilterInputStream { public XmlParser1(InputStream rawInputStream) { ... } public int read(byte[] b, int off, int l) throws IOException { // this would invoke the underlying stream read, parse internall the read bytes, // and leave them in the buffer } } XmlParser1 parser1 = new XmlParser1(inputstream); XmlParser2 parser2 = new XmlParser2(parse); parser2.parse(); // parser2 consumes all the input stream, which causes parser1 to read an parse it too 

2) Instead of treating XmlParser1 as consumer bytes, consider it as a receiver : we will not allow it to eat bytes themselves, we - Put it. So, instead of xmlParser1.parse(inputStream) we could xmlParser1.write(byte[]) ... that is, instead of passing it to InputStream , we will make it an OutputStream . This would allow the client to create a TeeInputStream that transparently passes bytes to the XmlParser2 class and which calls XmlParser1.write() at the same time.

Please note that in any case, we do not need separate threads.

I'm not sure which one (if any) is conceptually preferable if there are better alternatives. It sounds to me like a design issue that should have already been discussed, but I have not found much - not necessarily limited to Java. Opinions and links are welcome.

+4
source share
5 answers

Assuming two parses work in two separate threads, it could be like (not working code)

 public class Test extends FilterInputStream { byte[] buf = new byte[8192]; int len; Thread thread = null; @Override public synchronized int read(byte[] b, int off, int l) throws IOException { while (thread == Thread.currentThread() && len > 0) { thread.wait(); } if (len > 0) { System.arraycopy(buf, 0, b, off, l); len = 0; return l; } len = super.read(b, off, l); System.arraycopy(b, off, buf, 0, len); thread = Thread.currentThread(); notify(); return len; } 

that is, # 1 reads the bytes and saves them in buf, the next attempt # 1 is blocked until # 2 reads everything from the buffer

+1
source

If your streams are on the same server, your idea of ​​splitting InputStreams will not make any sense. Since you will use only one InputStream and one BufferedInputStream to receive data, create objects from InputStreams, and then you use these objects in two different streams. Conclusion: You never need to block an InputStream at any time in Java. I would even consider it harmful, because if you block, what will happen if your buffer or pipe flows? Queue overflow!

( EDIT: If you want to stop the stream, you need to tell the sender so that he no longer sends any data. Or you do it like youtube, they cut the video into parts (i.e. 1 part for 1 minute) and only preload these parts at the same time, so stopping the video will not affect preloading at all, since it is only preaload if you reach a certain position on the timeline (for example, 45 seconds, 1 minute 45 seconds, 2 minutes 45 seconds, ao). Well, this actually only preloading technology and from utstvie real streaming Youtube so do not need to bother with packages fall.)

However, I still have some lines of pseudo code for you, client:

 BufferedOutputStream bos = new BufferedOutputStream(/*yourBasicInputStream*/); ObjectOutputStream oos = new ObjectOutputStream(bos); //Or use another wrapper oos.writeObject(yourObjectToSend); //Or use another parser: Look into the API: ObjectInputStream 

Class variables in the main thread controller (aka server):

 Thread thread1; //eg a GUI controller Thread thread2; //eg a DB controller 

Server (or another server thread launched by the server, both threads as parameters):

 BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/); ObjectInputStream ois = new ObjectInputStream(bis); //Or use another wrapper //now we use an interface MyNetObject implementing the method getTarget(), but //also an abstract class would be possible (with a more complex getTarget-method): MyNetObject netObject = (MyNetObject) ois.readObject(); //Or use another parser... if(netObject.getTarget()=="Thread1ClassANDThread2Class"){ thread1.activateSync(netObject); //notify... thread2.activateSync(netObject); //...both threads! } else if(netObject.getTarget()=="Thread1Class"){ thread1.activate(netObject); //give thread1 a notification } else if(netObject.getTarget()=="Thread2Class"){ thread2.activate(netObject); //give thread2 a notification } else {//do something else...} 

Do not forget to synchronize the "activateSync (netObject)" method, but only if you want to make any changes to the object (you do not need to synchronize the readings, only the records):

 public void activateSync(MyNetObject netObject){ synchronize(netObject){ //do whatever you wanna do with the object...the other thread will always get the actual object! } } 

It is easy, fast, consistent ... and fully object oriented. I hope you understand this idea .;)

UPDATE:

It is important to understand that threads or readers are actually also a parser. With one important difference: streams are (usually) network classes that are used to write and read any data - except characters. While the reader is used to read any text / characters. So your correct implementation would be this: Read the incoming packets with some streams, and then just save the data to the appropriate object. Then you have a generic object that you can use in any reader. If you only have the image to read, you can try the readUTF() parser in the ObjectInputStream class ( http://docs.oracle.com/javase/1.4.2/docs/api/java/io/ObjectInputStream.html ) gives the line:

 BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/); ObjectInputStream ois = new ObjectInputStream(bis); String string = ois.readUTF(); //Or another usable parser/method XmlParser1.read(string); //for reads there is... XmlParser2.read(string); //...no synchronisation needed! 

Now it remains only to teach the parser how to read this line. While the string of the object itself can be considered a "stream". If this does not work for you, just find another parser / method to create the sink object.

Please note that the solutions discussed here - using the ObjectInputStream class with the appropriate parser - work in many cases, as well as with big data (then you simply cut a 1 GB file into several object / line packets before sending the network, the same the same as torrents). But it will not work with video / audio streaming, where your package may drop, and in any case you need completely different solutions (this is science as such: http://www.google.ch/search?q=video+ stream + packet + drop ).

+1
source

I tried pulling the original input stream into Apache Commons TeeInputStream to create an OutputStream. http://commons.apache.org/io/api-release/org/apache/commons/io/input/TeeInputStream.html

As the OutputStream to be written by IeeInputStream, I used java PipedOutputStream.

I connected this PipedOutputStream to java PipedInputStream.

This allowed me to read TeeInputStream and PipedInputStream. Not sure if it will work for you, or if at least it will take the next step.

I created a ReaderThread class to check if I can read them in parallel:

  private static class ReaderThread extends Thread { InputStream inStream; int threadId; public ReaderThread(int threadId, InputStream inStream) { this.inStream = inStream; this.threadId = threadId; } @Override public void run() { try { int c = inStream.read(); while (c != -1) { System.out.println("From ("+threadId+ ") "+c); c = inStream.read(); } } catch (Exception e) { e.printStackTrace(); } } } 

Then it was called by the following code:

 InputStream inStream = new FileInputStream(fileName); PipedInputStream pipedInStream = new PipedInputStream(); OutputStream pipedOutStream = new PipedOutputStream(pipedInStream); TeeInputStream tin = new TeeInputStream(inStream, pipedOutStream); ReaderThread firstThread = new ReaderThread(1,tin); ReaderThread secondThread = new ReaderThread(2,pipedInStream); firstThread.start(); secondThread.start(); 
0
source

It's unclear what is going on inside XmlParse1 and XmlParser2, but assuming that they really care about the final XML data, not the InputStream bytes, I would switch to the StAX XMLEvent api. you can force both parses to implement the XMLEventConsumer . Then you have an external loop that analyzes the actual stream and passes events to consumers:

 public void parseXml(InputStream stream) { XMLEventReader reader = ...; // convert stream into XMLEventReader XMLConsumer[] consumers = new XMLConsumer[]{new XmlParser1(), new XmlParser2()}; while(reader.hasNext()) { XMLEvent event = reader.nextEvent(); for(XMLConsumer consumer : consumers) { consumer.add(event)); } } 
0
source

You said that "the API of these consumers is inherently incompatible." Therefore, do not try to make them, KEEP INSULATION and give them what they want. Separate input streams.

There is a stream that reads the real input stream and writes two output streams.

Then create input streams from these output streams, you can do this with streams

pipedInputStream1 = new PipedInputStream (pipedOutputStream1);

and

ByteArrayInputStream (((ByteArrayOutputStream) byteOutputStream1) .toByteArray ());

0
source

All Articles