C # Full Duplex Asynchronous .NET Named Pipes

I am trying to create a full-duplex client-server interaction scheme on two different machines (only), where each endpoint (client or server) can send information at any time asynchronously (non-blocking channel), and the other end will pick it up and read it.

I don’t want the answers to refer to any other technology besides named pipes, I know about other technologies, but I want to get an answer to this specific question. (I saw this question, which has been posted so many times in different forums, and I continue to see answers advising to use some other technologies. I think this borders on the rough?)

I read that Named Pipes should be unidirectional or block, but I guess that is probably wrong. I think the channels are based on sockets, and I cannot imagine that the base socket would be one-way.

Any answers to this question should solve these problems in order to be really useful:

  • Responses should be addressed to asynchronous channels, I can not use a synchronous solution.
  • The answers must demonstrate or allow the pipes to remain OPEN. I'm tired of reading examples when the pipe is open, the line is transmitted, then the pipe closes immediately. I would like an answer suggesting that the pipes remain open and pass a lot of debris at random times and continue to repeat. no freezes.
  • C # based solution

I am sorry to sound demanding and snotty, but after several days of cleaning the Internet, I still have not found a good example, and I do not want to use WFC. If you know the details of this answer and answer it well, this topic will be a real winner for many years, I am sure. I will send an answer myself if I find out.

If you are going to write and say “you need to use two pipes”, explain why, and how do you know that this is true, because nothing that I read about it does not explain why this is so.

thanks!

+6
source share
3 answers

You do not need to use two pipes. I found many answers on the net that say you need to use two pipes. I rummaged around, did not sleep all night, tried and tried again, and figured out how to do it, it’s very simple, but you have to fix it (especially to get things in the correct order), or it just won’t work. Another trick is to always ensure that you have a call to read, or it closes. Do not write before you know that someone is reading. Do not start reading unless you have configured the event first. That kind of thing.

here is the pipe class that i use. It is probably not strong enough to handle pipe errors, shorts, and overflows.

Well, I have no idea what is wrong here, but formatting is a bit off! Vvvv

namespace Squall { public interface PipeSender { Task SendCommandAsync(PipeCommandPlusString pCmd); } /****************************************************************************** * * * * ******************************************************************************/ public class ClientPipe : BasicPipe { NamedPipeClientStream m_pPipe; public ClientPipe(string szServerName, string szPipeName) : base("Client") { m_szPipeName = szPipeName; // debugging m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous); base.SetPipeStream(m_pPipe); // inform base class what to read/write from } public void Connect() { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server"); m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout. StartReadingAsync(); } // the client pipe index is always 0 internal override int PipeId() { return 0; } } /****************************************************************************** * * * * ******************************************************************************/ public class ServerPipe : BasicPipe { public event EventHandler<EventArgs> GotConnectionEvent; NamedPipeServerStream m_pPipe; int m_nPipeId; public ServerPipe(string szPipeName, int nPipeId) : base("Server") { m_szPipeName = szPipeName; m_nPipeId = nPipeId; m_pPipe = new NamedPipeServerStream( szPipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous); base.SetPipeStream(m_pPipe); m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this); } static void StaticGotPipeConnection(IAsyncResult pAsyncResult) { ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe; pThis.GotPipeConnection(pAsyncResult); } void GotPipeConnection(IAsyncResult pAsyncResult) { m_pPipe.EndWaitForConnection(pAsyncResult); Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection"); if (GotConnectionEvent != null) { GotConnectionEvent(this, new EventArgs()); } // lodge the first read request to get us going // StartReadingAsync(); } internal override int PipeId() { return m_nPipeId; } } /****************************************************************************** * * * * ******************************************************************************/ public abstract class BasicPipe : PipeSender { public static int MaxLen = 1024 * 1024; // why not protected string m_szPipeName; protected string m_szDebugPipeName; public event EventHandler<PipeEventArgs> ReadDataEvent; public event EventHandler<EventArgs> PipeClosedEvent; protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen]; PipeStream m_pPipeStream; public BasicPipe(string szDebugPipeName) { m_szDebugPipeName = szDebugPipeName; } protected void SetPipeStream(PipeStream p) { m_pPipeStream = p; } protected string FullPipeNameDebug() { return m_szDebugPipeName + "-" + m_szPipeName; } internal abstract int PipeId(); public void Close() { m_pPipeStream.WaitForPipeDrain(); m_pPipeStream.Close(); m_pPipeStream.Dispose(); m_pPipeStream = null; } // called when Server pipe gets a connection, or when Client pipe is created public void StartReadingAsync() { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync"); // okay we're connected, now immediately listen for incoming buffers // byte[] pBuffer = new byte[MaxLen]; m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t => { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request"); int ReadLen = t.Result; if (ReadLen == 0) { Debug.WriteLine("Got a null read length, remote pipe was closed"); if (PipeClosedEvent != null) { PipeClosedEvent(this, new EventArgs()); } return; } if (ReadDataEvent != null) { ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen)); } else { Debug.Assert(false, "something happened"); } // lodge ANOTHER read request // StartReadingAsync(); }); } protected Task WriteByteArray(byte[] pBytes) { // this will start writing, but does it copy the memory before returning? return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length); } public Task SendCommandAsync(PipeCommandPlusString pCmd) { Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); string szSerializedCmd = JsonConvert.SerializeObject(pCmd); byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd); Task t = WriteByteArray(pSerializedCmd); return t; } } /****************************************************************************** * * * * ******************************************************************************/ public class PipeEventArgs { public byte[] m_pData; public int m_nDataLen; public PipeEventArgs(byte[] pData, int nDataLen) { // is this a copy, or an alias copy? I can't remember right now. m_pData = pData; m_nDataLen = nDataLen; } } /****************************************************************************** * if we're just going to send a string back and forth, then we can use this * class. It it allows us to get the bytes as a string. sort of silly. ******************************************************************************/ [Serializable] public class PipeCommandPlusString { public string m_szCommand; // must be public to be serialized public string m_szString; // ditto public PipeCommandPlusString(string sz, string szString) { m_szCommand = sz; m_szString = szString; } public string GetCommand() { return m_szCommand; } public string GetTransmittedString() { return m_szString; } } } 

and here is my tube test running on one process. It works on two processes, I checked

 namespace NamedPipeTest { public partial class Form1 : Form { SynchronizationContext _context; Thread m_pThread = null; volatile bool m_bDieThreadDie; ServerPipe m_pServerPipe; ClientPipe m_pClientPipe; public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { _context = SynchronizationContext.Current; m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0); m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent; m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent; // m_pThread = new Thread(StaticThreadProc); // m_pThread.Start( this ); } private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e) { Debug.WriteLine("Server: Pipe was closed, shutting down"); // have to post this on the main thread _context.Post(delegate { Close(); }, null); } private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e) { // this gets called on an anonymous thread byte[] pBytes = e.m_pData; string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length); PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); string szValue = pCmd.GetTransmittedString(); if (szValue == "CONNECT") { Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client"); PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED"); // fire off an async write Task t = m_pServerPipe.SendCommandAsync(pCmdToSend); } } static void StaticThreadProc(Object o) { Form1 pThis = o as Form1; pThis.ThreadProc(); } void ThreadProc() { m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE"); m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent; m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent; m_pClientPipe.Connect(); PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT"); int Counter = 1; while (Counter++ < 10) { Debug.WriteLine("Counter = " + Counter); m_pClientPipe.SendCommandAsync(pCmd); Thread.Sleep(3000); } while (!m_bDieThreadDie) { Thread.Sleep(1000); } m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent; m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent; m_pClientPipe.Close(); m_pClientPipe = null; } private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e) { // wait around for server to shut us down } private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e) { byte[] pBytes = e.m_pData; string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen); PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); string szValue = pCmd.GetTransmittedString(); Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); if (szValue == "CONNECTED") { PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA"); m_pClientPipe.SendCommandAsync(pCmdToSend); } } private void Form1_FormClosing(object sender, FormClosingEventArgs e) { if (m_pThread != null) { m_bDieThreadDie = true; m_pThread.Join(); m_bDieThreadDie = false; } m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent; m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent; m_pServerPipe.Close(); m_pServerPipe = null; } } } 
+6
source

I think that when you use asynchronous communication, you need to use two channels.

One of them is recv pipe, the other is send pipe

Because you do not know when you return data.

When you send some data with one channel, recv data cannot be written to the handset.

In contrast, you cannot write send data on the handset.

therefore, for asynchronous communication, you need to run two channels.

0
source

Just create the channel as overlapping, and your code may be blocked when reading in one stream while writing to a pipe from another.

  void StartServer() { Task.Factory.StartNew(() => { var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); server.WaitForConnection(); reader = new StreamReader(server); writer = new StreamWriter(server); }); } private async void timer1_Tick(object sender, EventArgs e) { timer1.Stop(); if (null != reader) { char[] buf = new char[50]; int count = await reader.ReadAsync(buf, 0, 50); if (0 < count) { m_textBox_from.Text = new string(buf, 0, count); } } timer1.Start(); } 
0
source

All Articles