一个进程间同步和通讯的C#框架

threadmsg_demo.zip ~ 41KB    下载

站在用户的角度思考问题,与客户深入沟通,找到贵港网站设计与贵港网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:成都做网站、网站建设、企业官网、英文网站、手机端网站、网站推广、国际域名空间、雅安服务器托管、企业邮箱。业务覆盖贵港地区。

threadmsg_src.zip ~ 65KB    下载

0.背景简介

微软在 .NET 框架中提供了多种实用的线程同步手段,其中包括 monitor 类及 reader-writer锁。但跨进程的同步方法还是非常欠缺。另外,目前也没有方便的线程间及进程间传递消息的方法。例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息。为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯。这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到 www.cdrnet.net/projects/threadmsg/.

这个框架的目的是:

1.封装性:通过MSMQ消息队列发送消息的线程无需关心消息是发送到另一个线程还是另一台机器。

2.简单性:向其他进程发送消息只需调用一个方法。

注意:我删除了本文中全部代码的XML注释以节省空间。如果你想知道这些方法和参数的详细信息,请参考附件中的代码。

1.先看一个简单例子

使用了这个库后,跨进程的消息传递将变得非常简单。我将用一个小例子来作示范:一个控制台程序,根据参数可以作为发送方也可以作为接收方运行。在发送程序里,你可以输入一定的文本并发送到信箱内(返回key),接收程序将显示所有从信箱内收到的消息。你可以运行无数个发送程序和接收程序,但是每个消息只会被具体的某一个接收程序所收到。

 
 
 
 
  1. [Serializable] 
  2. struct Message 
  3.   public string Text; 
  4.  
  5. class Test 
  6.   IMailBox mail; 
  7.  
  8.   public Test() 
  9.   { 
  10.     mail = new ProcessMailBox("TMProcessTest",1024); 
  11.   } 
  12.  
  13.   public void RunWriter() 
  14.   { 
  15.     Console.WriteLine("Writer started"); 
  16.     Message msg; 
  17.     while(true) 
  18.     { 
  19.       msg.Text = Console.ReadLine(); 
  20.       if(msg.Text.Equals("exit")) 
  21.         break; 
  22.       mail.Content = msg; 
  23.     } 
  24.   } 
  25.  
  26.   public void RunReader() 
  27.   { 
  28.     Console.WriteLine("Reader started"); 
  29.     while(true) 
  30.     { 
  31.       Message msg = (Message)mail.Content; 
  32.       Console.WriteLine(msg.Text); 
  33.     } 
  34.   } 
  35.  
  36.   [STAThread] 
  37.   static void Main(string[] args) 
  38.   { 
  39.     Test test = new Test(); 
  40.     if(args.Length > 0) 
  41.       test.RunWriter(); 
  42.     else 
  43.       test.RunReader(); 
  44.   } 

信箱一旦创建之后(这上面代码里是 ProcessMailBox ),接收消息只需要读取 Content 属性,发送消息只需要给这个属性赋值。当没有数据时,获取消息将会阻塞当前线程;发送消息时如果信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,整个程序是完全基于中断的,并且不会过度占用CPU(不需要进行轮询)。发送和接收的消息可以是任意支持序列化(Serializable)的类型。

然而,实际上暗地里发生的事情有点复杂:消息通过内存映射文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在 pagefile 里面产生虚拟文件。对这个虚拟文件的访问是通过 win32 信号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需要声明Serializable属性。内存映射文件和 win32 信号量都需要调用 NT内核的方法。多得了 .NET 框架中的 Marshal 类,我们可以避免编写不安全的代码。我们将在下面讨论更多的细节。

#p#

2. .NET里面的跨线程/进程同步

线程/进程间的通讯需要共享内存或者其他内建机制来发送/接收数据。即使是采用共享内存的方式,也还需要一组同步方法来允许并发访问。

同一个进程内的所有线程都共享公共的逻辑地址空间(堆)。对于不同进程,从 win2000 开始就已经无法共享内存。然而,不同的进程可以读写同一个文件。WinAPI提供了多种系统调用方法来映射文件到进程的逻辑空间,及访问系统内核对象(会话)指向的 pagefile 里面的虚拟文件。无论是共享堆,还是共享文件,并发访问都有可能导致数据不一致。我们就这个问题简单讨论一下,该怎样确保线程/进程调用的有序性及数据的一致性。

2.1 线程同步

.NET 框架和 C# 提供了方便直观的线程同步方法,即 monitor 类和 lock 语句(本文将不会讨论 .NET 框架的互斥量)。对于线程同步,虽然本文提供了其他方法,我们还是推荐使用 lock 语句。

 
 
 
 
  1. void Work1() 
  2.   NonCriticalSection1(); 
  3.   Monitor.Enter(this); 
  4.   try 
  5.   { 
  6.     CriticalSection(); 
  7.   } 
  8.   finally 
  9.   { 
  10.     Monitor.Exit(this); 
  11.   } 
  12.   NonCriticalSection2(); 

Work1 和 Work2 是等价的。在C#里面,很多人喜欢第二个方法,因为它更短,且不容易出错。

2.2 跨线程信号量

信号量是经典的同步基本概念之一(由 Edsger Dijkstra 引入)。信号量是指一个有计数器及两个操作的对象。它的两个操作是:获取(也叫P或者等待),释放(也叫V或者收到信号)。信号量在获取操作时如果计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。虽然信号量的原理很简单,但是实现起来有点麻烦。好在,内建的 monitor 类有阻塞特性,可以用来实现信号量。

 
 
 
 
  1. public sealed class ThreadSemaphore : ISemaphore 
  2.   private int counter; 
  3.   private readonly int max; 
  4.  
  5.   public ThreadSemaphore() : this(0, int.Max) {} 
  6.   public ThreadSemaphore(int initial) : this(initial, int.Max) {} 
  7.   public ThreadSemaphore(int initial, int max) 
  8.   { 
  9.     this.counter = Math.Min(initial,max); 
  10.     this.max = max; 
  11.   } 
  12.  
  13.   public void Acquire() 
  14.   { 
  15.     lock(this) 
  16.     { 
  17.       counter--; 
  18.       if(counter < 0 && !Monitor.Wait(this)) 
  19.         throw new SemaphoreFailedException(); 
  20.     } 
  21.   } 
  22.  
  23.   public void Acquire(TimeSpan timeout) 
  24.   { 
  25.     lock(this) 
  26.     { 
  27.       counter--; 
  28.       if(counter < 0 && !Monitor.Wait(this,timeout)) 
  29.         throw new SemaphoreFailedException(); 
  30.     } 
  31.   } 
  32.  
  33.   public void Release() 
  34.   { 
  35.     lock(this) 
  36.     { 
  37.       if(counter >= max) 
  38.         throw new SemaphoreFailedException(); 
  39.       if(counter < 0) 
  40.         Monitor.Pulse(this); 
  41.       counter++; 
  42.     } 
  43.   } 

信号量在复杂的阻塞情景下更加有用,例如我们后面将要讨论的通道(channel)。你也可以使用信号量来实现临界区的排他性(如下面的 Work3),但是我还是推荐使用内建的 lock 语句,像上面的 Work2 那样。

请注意:如果使用不当,信号量也是有潜在危险的。正确的做法是:当获取信号量失败时,千万不要再调用释放操作;当获取成功时,无论发生了什么错误,都要记得释放信号量。遵循这样的原则,你的同步才是正确的。Work3 中的 finally 语句就是为了保证正确释放信号量。注意:获取信号量( s.Acquire() )的操作必须放到 try 语句的外面,只有这样,当获取失败时才不会调用释放操作。

 
 
 
 
  1. ThreadSemaphore s = new ThreadSemaphore(1); 
  2. void Work3() 
  3.   NonCriticalSection1(); 
  4.   s.Acquire(); 
  5.   try 
  6.   { 
  7.     CriticalSection(); 
  8.   } 
  9.   finally 
  10.   { 
  11.     s.Release(); 
  12.   } 
  13.   NonCriticalSection2(); 

2.3 跨进程信号量

为了协调不同进程访问同一资源,我们需要用到上面讨论过的概念。很不幸,.NET 中的 monitor 类不可以跨进程使用。但是,win32 API提供的内核信号量对象可以用来实现跨进程同步。 Robin Galloway-Lunn 介绍了怎样将 win32 的信号量映射到 .NET 中(见 Using Win32 Semaphores in C# )。我们的实现也类似:

 
 
 
 
  1. [DllImport("kernel32",EntryPoint="CreateSemaphore", 
  2.      SetLastError=true,CharSet=CharSet.Unicode)] 
  3. internal static extern uint CreateSemaphore( 
  4.   SecurityAttributes auth, int initialCount, 
  5.     int maximumCount, string name); 
  6.  
  7. [DllImport("kernel32",EntryPoint="WaitForSingleObject", 
  8.  SetLastError=true,CharSet=CharSet.Unicode)] 
  9. internal static extern uint WaitForSingleObject( 
  10.  uint hHandle, uint dwMilliseconds); 
  11.  
  12. [DllImport("kernel32",EntryPoint="ReleaseSemaphore", 
  13.  SetLastError=true,CharSet=CharSet.Unicode)] 
  14. [return : MarshalAs( UnmanagedType.VariantBool )] 
  15. internal static extern bool ReleaseSemaphore( 
  16.   uint hHandle, int lReleaseCount, out int lpPreviousCount); 
  17.      
  18. [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, 
  19.   CharSet=CharSet.Unicode)] 
  20. [return : MarshalAs( UnmanagedType.VariantBool )] 
  21. internal static extern bool CloseHandle(uint hHandle); 
 
 
 
 
  1. public class ProcessSemaphore : ISemaphore, IDisposable 
  2.   private uint handle; 
  3.   private readonly uint interruptReactionTime; 
  4.  
  5.   public ProcessSemaphore(string name) : this( 
  6.    name,0,int.MaxValue,500) {} 
  7.   public ProcessSemaphore(string name, int initial) : this( 
  8.    name,initial,int.MaxValue,500) {} 
  9.   public ProcessSemaphore(string name, int initial, 
  10.    int max, int interruptReactionTime) 
  11.   {        
  12.     this.interruptReactionTime = (uint)interruptReactionTime; 
  13.     this.handle = NTKernel.CreateSemaphore(null, initial, max, name); 
  14.     if(handle == 0) 
  15.       throw new SemaphoreFailedException(); 
  16.   } 
  17.  
  18.   public void Acquire() 
  19.   { 
  20.     while(true) 
  21.     { //looped 0.5s timeout to make NT-blocked threads interruptable. 
  22.       uint res = NTKernel.WaitForSingleObject(handle,  
  23.        interruptReactionTime); 
  24.       try {System.Threading.Thread.Sleep(0);}  
  25.       catch(System.Threading.ThreadInterruptedException e) 
  26.       { 
  27.         if(res == 0) 
  28.         { //Rollback  
  29.           int previousCount; 
  30.           NTKernel.ReleaseSemaphore(handle,1,out previousCount); 
  31.         } 
  32.         throw e; 
  33.       } 
  34.       if(res == 0) 
  35.         return; 
  36.       if(res != 258) 
  37.         throw new SemaphoreFailedException(); 
  38.     } 
  39.   } 
  40.  
  41.   public void Acquire(TimeSpan timeout) 
  42.   { 
  43.     uint milliseconds = (uint)timeout.TotalMilliseconds; 
  44.     if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0) 
  45.       throw new SemaphoreFailedException();   
  46.   } 
  47.  
  48.   public void Release() 
  49.   { 
  50.     int previousCount; 
  51.     if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) 
  52.       throw new SemaphoreFailedException();   
  53.   } 
  54.  
  55.   #region IDisposable Member 
  56.   public void Dispose() 
  57.   { 
  58.     if(handle != 0) 
  59.     { 
  60.       if(NTKernel.CloseHandle(handle)) 
  61.         handle = 0; 
  62.     } 
  63.   } 
  64.   #endregion 

有一点很重要:win32中的信号量是可以命名的。这允许其他进程通过名字来创建相应信号量的句柄。为了让阻塞线程可以中断,我们使用了一个(不好)的替代方法:使用超时和 Sleep(0)。我们需要中断来安全关闭线程。更好的做法是:确定没有线程阻塞之后才释放信号量,这样程序才可以完全释放资源并正确退出。

你可能也注意到了:跨线程和跨进程的信号量都使用了相同的接口。所有相关的类都使用了这种模式,以实现上面背景介绍中提到的封闭性。需要注意:出于性能考虑,你不应该将跨进程的信号量用到跨线程的场景,也不应该将跨线程的实现用到单线程的场景。

#p#

3. 跨进程共享内存:内存映射文件

我们已经实现了跨线程和跨进程的共享资源访问同步。但是传递/接收消息还需要共享资源。对于线程来说,只需要声明一个类成员变量就可以了。但是对于跨进程来说,我们需要使用到 win32 API 提供的内存映射文件(Memory Mapped Files,简称MMF)。使用 MMF和使用 win32 信号量差不多。我们需要先调用 CreateFileMapping 方法来创建一个内存映射文件的句柄:

 
 
 
 
  1. [DllImport("Kernel32.dll",EntryPoint="CreateFileMapping", 
  2.      SetLastError=true,CharSet=CharSet.Unicode)] 
  3. internal static extern IntPtr CreateFileMapping(uint hFile,  
  4.  SecurityAttributes lpAttributes, uint flProtect, 
  5.   uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); 
  6.      
  7. [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", 
  8.  SetLastError=true,CharSet=CharSet.Unicode)] 
  9. internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,  
  10.   uint dwDesiredAccess, uint dwFileOffsetHigh, 
  11.   uint dwFileOffsetLow, uint dwNumberOfBytesToMap); 
  12.      
  13. [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", 
  14.  SetLastError=true,CharSet=CharSet.Unicode)] 
  15. [return : MarshalAs( UnmanagedType.VariantBool )] 
  16. internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress); 
 
 
 
 
  1. public static MemoryMappedFile CreateFile(string name,  
  2.      FileAccess access, int size) 
  3.   if(size < 0) 
  4.     throw new ArgumentException("Size must not be negative","size"); 
  5.  
  6.   IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null, 
  7.    (uint)access,0,(uint)size,name); 
  8.   if(fileMapping == IntPtr.Zero) 
  9.     throw new MemoryMappingFailedException(); 
  10.  
  11.   return new MemoryMappedFile(fileMapping,size,access); 

我们希望直接使用 pagefile 中的虚拟文件,所以我们用 -1(0xFFFFFFFF) 来作为文件句柄来创建我们的内存映射文件句柄。我们也指定了必填的文件大小,以及相应的名称。这样其他进程就可以通过这个名称来同时访问该映射文件。创建了内存映射文件后,我们就可以映射这个文件不同的部分(通过偏移量和字节大小来指定)到我们的进程地址空间。我们通过 MapViewOfFile 系统方法来指定:

 
 
 
 
  1. public MemoryMappedFileView CreateView(int offset, int size, 
  2.       MemoryMappedFileView.ViewAccess access) 
  3.   if(this.access == FileAccess.ReadOnly && access ==  
  4.     MemoryMappedFileView.ViewAccess.ReadWrite) 
  5.     throw new ArgumentException( 
  6.      "Only read access to views allowed on files without write access", 
  7.      "access"); 
  8.   if(offset < 0) 
  9.     throw new ArgumentException("Offset must not be negative","size"); 
  10.   if(size < 0) 
  11.     throw new ArgumentException("Size must not be negative","size"); 
  12.   IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, 
  13.    (uint)access,0,(uint)offset,(uint)size); 
  14.   return new MemoryMappedFileView(mappedView,size,access); 

在不安全的代码中,我们可以将返回的指针强制转换成我们指定的类型。尽管如此,我们不希望有不安全的代码存在,所以我们使用 Marshal 类来从中读写我们的数据。偏移量参数是用来从哪里开始读写数据,相对于指定的映射视图的地址。

 
 
 
 
  1. public byte ReadByte(int offset) 
  2.   return Marshal.ReadByte(mappedView,offset); 
  3. public void WriteByte(byte data, int offset) 
  4.   Marshal.WriteByte(mappedView,offset,data); 
  5.  
  6. public int ReadInt32(int offset) 
  7.   return Marshal.ReadInt32(mappedView,offset); 
  8. public void WriteInt32(int data, int offset) 
  9.   Marshal.WriteInt32(mappedView,offset,data); 
  10.  
  11. public void ReadBytes(byte[] data, int offset) 
  12.   for(int i=0;i
  13.     data[i] = Marshal.ReadByte(mappedView,offset+i); 
  14. public void WriteBytes(byte[] data, int offset) 
  15.   for(int i=0;i
  16.     Marshal.WriteByte(mappedView,offset+i,data[i]); 

但是,我们希望读写整个对象树到文件中,所以我们需要支持自动进行序列化和反序列化的方法。

 
 
 
 
  1. public object ReadDeserialize(int offset, int length) 
  2.   byte[] binaryData = new byte[length]; 
  3.   ReadBytes(binaryData,offset); 
  4.   System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter 
  5.     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); 
  6.   System.IO.MemoryStream ms = new System.IO.MemoryStream( 
  7.    binaryData,0,length,true,true); 
  8.   object data = formatter.Deserialize(ms); 
  9.   ms.Close(); 
  10.   return data; 
  11. public void WriteSerialize(object data, int offset, int length) 
  12. System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter 
  13.     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); 
  14.   byte[] binaryData = new byte[length]; 
  15.   System.IO.MemoryStream ms = new System.IO.MemoryStream( 
  16.    binaryData,0,length,true,true); 
  17.   formatter.Serialize(ms,data); 
  18.   ms.Flush(); 
  19.   ms.Close(); 
  20.   WriteBytes(binaryData,offset); 

请注意:对象序列化之后的大小不应该超过映射视图的大小。序列化之后的大小总是比对象本身占用的内存要大的。我没有试过直接将对象内存流绑定到映射视图,那样做应该也可以,甚至可能带来少量的性能提升。

#p#

4. 信箱:在线程/进程间传递消息

这里的信箱与 Email 及 NT 中的邮件槽(Mailslots)无关。它是一个只能保留一个对象的安全共享内存结构。信箱的内容通过一个属性来读写。如果信箱内容为空,试图读取该信箱的线程将会阻塞,直到另一个线程往其中写内容。如果信箱已经有了内容,当一个线程试图往其中写内容时将被阻塞,直到另一个线程将信箱内容读取出去。信箱的内容只能被读取一次,它的引用在读取后自动被删除。基于上面的代码,我们已经可以实现信箱了。

4.1 跨线程的信箱

我们可以使用两个信号量来实现一个信箱:一个信号量在信箱内容为空时触发,另一个在信箱有内容时触发。在读取内容之前,线程先等待信箱已经填充了内容,读取之后触发空信号量。在写入内容之前,线程先等待信箱内容清空,写入之后触发满信号量。注意:空信号量在一开始时就被触发了。

 
 
 
 
  1. public sealed class ThreadMailBox : IMailBox 
  2.   private object content; 
  3.   private ThreadSemaphore empty, full; 
  4.  
  5.   public ThreadMailBox() 
  6.   { 
  7.     empty = new ThreadSemaphore(1,1); 
  8.     full = new ThreadSemaphore(0,1); 
  9.   } 
  10.  
  11.   public object Content 
  12.   { 
  13.     get 
  14.     { 
  15.       full.Acquire(); 
  16.       object item = content; 
  17.       empty.Release(); 
  18.       return item; 
  19.     } 
  20.     set  
  21.     { 
  22.       empty.Acquire(); 
  23.       content = value; 
  24.       full.Release(); 
  25.     } 
  26.   } 

4.2  跨进程信箱

跨进程信箱与跨线程信箱的实现基本上一样简单。不同的是我们使用两个跨进程的信号量,并且我们使用内存映射文件来代替类成员变量。由于序列化可能会失败,我们使用了一小段异常处理来回滚信箱的状态。失败的原因有很多(无效句柄,拒绝访问,文件大小问题,Serializable属性缺失等等)。

 
 
 
 
  1. public sealed class ProcessMailBox : IMailBox, IDisposable 
  2.   private MemoryMappedFile file; 
  3.   private MemoryMappedFileView view; 
  4.   private ProcessSemaphore empty, full; 
  5.  
  6.   public ProcessMailBox(string name,int size) 
  7.   { 
  8.     empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1); 
  9.     full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1); 
  10.     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox", 
  11.       MemoryMappedFile.FileAccess.ReadWrite,size); 
  12.     view = file.CreateView(0,size, 
  13.      MemoryMappedFileView.ViewAccess.ReadWrite); 
  14.   } 
  15.  
  16.   public object Content 
  17.   { 
  18.     get 
  19.     { 
  20.       full.Acquire(); 
  21.       object item; 
  22.       try {item = view.ReadDeserialize();} 
  23.       catch(Exception e) 
  24.       {  //Rollback 
  25.         full.Release(); 
  26.         throw e; 
  27.       } 
  28.       empty.Release(); 
  29.       return item; 
  30.     } 
  31.  
  32.     set  
  33.     { 
  34.       empty.Acquire(); 
  35.       try {view.WriteSerialize(value);} 
  36.       catch(Exception e) 
  37.       {  //Rollback 
  38.         empty.Release(); 
  39.         throw e; 
  40.       } 
  41.       full.Release(); 
  42.     } 
  43.   } 
  44.  
  45.   #region IDisposable Member 
  46.   public void Dispose() 
  47.   { 
  48.     view.Dispose(); 
  49.     file.Dispose(); 
  50.     empty.Dispose(); 
  51.     full.Dispose(); 
  52.   } 
  53.   #endregion 

到这里我们已经实现了跨进程消息传递(IPC)所需要的组件。你可能需要再回头本文开头的那个例子,看看 ProcessMailBox 应该如何使用

#p#

5.通道:基于队列的消息传递

信箱最大的限制是它们每次只能保存一个对象。如果一系列线程(使用同一个信箱)中的一个线程需要比较长的时间来处理特定的命令,那么整个系列都会阻塞。通常我们会使用缓冲的消息通道来处理,这样你可以在方便的时候从中读取消息,而不会阻塞消息发送者。这种缓冲通过通道来实现,这里的通道比信箱要复杂一些。同样,我们将分别从线程和进程级别来讨论通道的实现。

5.1 可靠性

信箱和通道的另一个重要的不同是:通道拥有可靠性。例如:自动将发送失败(可能由于线程等待锁的过程中被中断)的消息转存到一个内置的容器中。这意味着处理通道的线程可以安全地停止,同时不会丢失队列中的消息。这通过两个抽象类来实现, ThreadReliability 和 ProcessReliability。每个通道的实现类都继承其中的一个类。

5.2 跨线程的通道

跨线程的通道基于信箱来实现,但是使用一个同步的队列来作为消息缓冲而不是一个变量。得益于信号量,通道在空队列时阻塞接收线程,在队列满时阻塞发送线程。这样你就不会碰到由入队/出队引发的错误。为了实现这个效果,我们用队列大小来初始化空信号量,用0来初始化满信号量。如果某个发送线程在等待入队的时候被中断,我们将消息复制到内置容器中,并将异常往外面抛。在接收操作中,我们不需要做异常处理,因为即使线程被中断你也不会丢失任何消息。注意:线程只有在阻塞状态才能被中断,就像调用信号量的获取操作(Aquire)方法时。

 
 
 
 
  1. public sealed class ThreadChannel : ThreadReliability, IChannel 
  2.   private Queue queue; 
  3.   private ThreadSemaphore empty, full; 
  4.  
  5.   public ThreadChannel(int size) 
  6.   { 
  7.     queue = Queue.Synchronized(new Queue(size)); 
  8.     empty = new ThreadSemaphore(size,size); 
  9.     full = new ThreadSemaphore(0,size); 
  10.   } 
  11.  
  12.   public void Send(object item) 
  13.   { 
  14.     try {empty.Acquire();} 
  15.     catch(System.Threading.ThreadInterruptedException e) 
  16.     { 
  17.       DumpItem(item); 
  18.       throw e; 
  19.     } 
  20.     queue.Enqueue(item); 
  21.     full.Release(); 
  22.   } 
  23.  
  24.   public void Send(object item, TimeSpan timeout) 
  25.   { 
  26.     try {empty.Acquire(timeout);} 
  27.     ... 
  28.   } 
  29.  
  30.   public object Receive() 
  31.   { 
  32.     full.Acquire(); 
  33.     object item = queue.Dequeue(); 
  34.     empty.Release(); 
  35.     return item; 
  36.   } 
  37.  
  38.   public object Receive(TimeSpan timeout) 
  39.   { 
  40.     full.Acquire(timeout); 
  41.     ... 
  42.   } 
  43.    
  44.   protected override void DumpStructure() 
  45.   { 
  46.     lock(queue.SyncRoot) 
  47.     { 
  48.       foreach(object item in queue) 
  49.         DumpItem(item); 
  50.       queue.Clear(); 
  51.     } 
  52.   } 

5.3 跨进程通道

实现跨进程通道有点麻烦,因为你需要首先提供一个跨进程的缓冲区。一个可能的解决方法是使用跨进程信箱并根据需要将接收/发送方法加入队列。为了避免这种方案的几个缺点,我们将直接使用内存映射文件来实现一个队列。MemoryMappedArray 类将内存映射文件分成几部分,可以直接使用数组索引来访问。 MemoryMappedQueue 类,为这个数组提供了一个经典的环(更多细节请查看附件中的代码)。为了支持直接以 byte/integer 类型访问数据并同时支持二进制序列化,调用方需要先调用入队(Enqueue)/出队(Dequeue)操作,然后根据需要使用读写方法(队列会自动将数据放到正确的位置)。这两个类都不是线程和进程安全的,所以我们需要使用跨进程的信号量来模拟互斥量(也可以使用 win32 互斥量),以此实现相互间的互斥访问。除了这两个类,跨进程的通道基本上和跨线程信箱一样。同样,我们也需要在 Send() 中处理线程中断及序列化可能失败的问题。

 
 
 
 
  1. public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable 
  2.   private MemoryMappedFile file; 
  3.   private MemoryMappedFileView view; 
  4.   private MemoryMappedQueue queue; 
  5.   private ProcessSemaphore empty, full, mutex; 
  6.  
  7.   public ProcessChannel( int size, string name, int maxBytesPerEntry) 
  8.   { 
  9.     int fileSize = 64+size*maxBytesPerEntry; 
  10.  
  11.     empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size); 
  12.     full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size); 
  13.     mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1); 
  14.     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel", 
  15.       MemoryMappedFile.FileAccess.ReadWrite,fileSize); 
  16.     view = file.CreateView(0,fileSize, 
  17.      MemoryMappedFileView.ViewAccess.ReadWrite); 
  18.     queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0); 
  19.     if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry) 
  20.       throw new MemoryMappedArrayFailedException(); 
  21.   } 
  22.  
  23.   public void Send(object item) 
  24.   { 
  25.     try {empty.Acquire();} 
  26.     catch(System.Threading.ThreadInterruptedException e) 
  27.     { 
  28.       DumpItemSynchronized(item); 
  29.       throw e; 
  30.     } 
  31.     try {mutex.Acquire();} 
  32.     catch(System.Threading.ThreadInterruptedException e) 
  33.     { 
  34.       DumpItemSynchronized(item); 
  35.       empty.Release(); 
  36.       throw e; 

    网站名称:一个进程间同步和通讯的C#框架
    文章网址:http://www.mswzjz.cn/qtweb/news1/11201.html

    攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

    广告

    声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能