C#消息隊(duì)列應(yīng)用程序 -1
發(fā)表時(shí)間:2024-02-18 來(lái)源:明輝站整理相關(guān)軟件相關(guān)文章人氣:
[摘要]簡(jiǎn)介 Microsoft近期推出一種用于生成集成應(yīng)用程序的新平臺(tái)——Microsoft .NET框架。.NET 框架允許開(kāi)發(fā)人員使用任何編程語(yǔ)言迅速生成和部署Web 服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL)和實(shí)時(shí) (JIT )編譯器使這種不依...
簡(jiǎn)介
Microsoft近期推出一種用于生成集成應(yīng)用程序的新平臺(tái)——Microsoft
.NET框架。.NET 框架允許開(kāi)發(fā)人員使用任何編程語(yǔ)言迅速生成和部署Web
服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL)和實(shí)時(shí)
(JIT )編譯器使這種不依賴語(yǔ)言的框架得以實(shí)現(xiàn)。
與.NET框架同時(shí)面世的還有一種新的編程語(yǔ)言C#(讀“C sharp”)。
C#是一種簡(jiǎn)單、新穎、面向?qū)ο蠛皖愋桶踩木幊陶Z(yǔ)言。利用 .NET 框架
和 C# (除 Microsoft? Visual Basic ?和 Managed C++之外),用戶
可以編寫功能強(qiáng)大的 Microsoft Windows?和 Web應(yīng)用程序及服務(wù)。本文
提供了這樣的一個(gè)解決方案,它的重點(diǎn)是 .NET 框架和 C# 而不是編程語(yǔ)
言。C#語(yǔ)言的介紹可以在“ C# 簡(jiǎn)介和概述(英文)”找到。
近期的文章“MSMQ:可伸縮、高可用性的負(fù)載平衡解決方案(英文)”
介紹了一種解決方案,用于高可用性消息隊(duì)列(MSMQ)的可伸縮負(fù)載平衡
解決方案體系結(jié)構(gòu)。此解決方案中涉及了一種將 Windows服務(wù)用作智能消
息路由器的開(kāi)發(fā)方案。這樣的解決方案以前只有 Microsoft Visual C++
程序員才能實(shí)現(xiàn),而 .NET 框架的出現(xiàn)改變了這種情況。從下面的解決方
案中,您可以看到這一點(diǎn)。
.NET 框架應(yīng)用程序
這里介紹的解決方案是一種用來(lái)處理若干消息隊(duì)列的 Windows服務(wù);
其中每個(gè)隊(duì)列都是由多個(gè)線程進(jìn)行處理(接收和處理消息)。處理程序使
用循環(huán)法技術(shù)或應(yīng)用程序特定值(消息 AppSpecific屬性)從目的隊(duì)列列
表中路由消息,并使用消息屬性來(lái)調(diào)用組件方法。(示例進(jìn)程也屬于這種
情況。)在后一種情況下,組件的要求是它能夠?qū)崿F(xiàn)給定的接口IWeb
Message要處理錯(cuò)誤,應(yīng)用程序需要將不能處理的消息發(fā)送到錯(cuò)誤隊(duì)列中。
消息應(yīng)用程序的結(jié)構(gòu)與以前的活動(dòng)模板庫(kù)(ATL )應(yīng)用程序相似,它
們之間的主要不同在于用于管理服務(wù)的代碼的封裝和 .NET 框架組件的使
用。要?jiǎng)?chuàng)建Windows服務(wù),.NET框架用戶僅僅需要?jiǎng)?chuàng)建一個(gè)從 ServiceBase
(來(lái)自System.ServiceControl程序集)繼承的類。這毫不奇怪,因?yàn)?NET
框架是面向?qū)ο蟮摹?
應(yīng)用程序結(jié)構(gòu)
應(yīng)用程序中主要的類是 ServiceControl ,它是從 ServiceBase繼承
的。因而,它必須實(shí)現(xiàn) OnStart和 OnStop 方法,以及可選的 OnPause和
OnContinue方法。事實(shí)上,類是在靜態(tài)方法 Main 內(nèi)構(gòu)造的:
using System;
using System.ServiceProcess;
public class ServiceControl: ServiceBase
{
// 創(chuàng)建服務(wù)對(duì)象的主入口點(diǎn)
public static void Main()
{
ServiceBase.Run(new ServiceControl());
}
// 定義服務(wù)參數(shù)的構(gòu)造對(duì)象
public ServiceControl()
{
CanPauseAndContinue = true;
ServiceName = "MSDNMessageService";
AutoLog = false;
}
protected override void OnStart(string[] args) {...}
protected override void OnStop() {...}
protected override void OnPause() {...}
protected override void OnContinue() {...}
}
ServiceControl類創(chuàng)建一系列 CWorker對(duì)象,即,為需要處理的每個(gè)
消息隊(duì)列創(chuàng)建 CWorker類的一個(gè)實(shí)例。根據(jù)定義中處理隊(duì)列所需的線程數(shù)
目,CWorker 類依次創(chuàng)建了一系列的 CWorkerThread對(duì)象。CWorkerThread
類創(chuàng)建的一個(gè)處理線程將執(zhí)行實(shí)際的服務(wù)工作。
使用 CWorker和 CWorkerThread類的主要目的是確認(rèn)服務(wù)控件 Start、
Stop、Pause 和 Continue 命令。因?yàn)檫@些進(jìn)程必須是無(wú)阻塞的,命令操
作最終將在后臺(tái)處理線程上執(zhí)行。
CWorkerThread 是一個(gè)抽象類,被 CWorkerThreadAppSpecific 、
CWorkerThreadRoundRobin 和 CWorkerThreadAssembly繼承。這些類以不
同的方式處理消息。前兩個(gè)類通過(guò)給另一隊(duì)列發(fā)送消息來(lái)處理消息(其不
同之處在于確定接收隊(duì)列路徑的方式),最后一個(gè)類則使用消息屬性來(lái)調(diào)
用組件方法。
.NET 框架內(nèi)部的錯(cuò)誤處理是以基類 Exception為基礎(chǔ)的。當(dāng)系統(tǒng)引
發(fā)或捕獲錯(cuò)誤時(shí),這些錯(cuò)誤必須是從 Exception中導(dǎo)出的類。CWorker
ThreadException 類就是這樣一種實(shí)現(xiàn),它通過(guò)附加額外屬性(用于定義
服務(wù)是否應(yīng)繼續(xù)運(yùn)行)來(lái)擴(kuò)展基類。
最后,應(yīng)用程序包含兩種結(jié)構(gòu)。這些值類型定義了輔助進(jìn)程或線程的
運(yùn)行時(shí)參數(shù),以簡(jiǎn)化 CWorker和 CWorkerThread對(duì)象的結(jié)構(gòu)。使用值類型
結(jié)構(gòu)(而不是引用類型類)能夠確保這些運(yùn)行時(shí)參數(shù)維護(hù)的是數(shù)值(而不
是引用)。
IWebMessage 接口
CWorkerThread 的實(shí)現(xiàn)之一是一個(gè)調(diào)用組件方法的類。這個(gè)名為
CWorkerThreadAssembly 的類使用 IWebMessage接口來(lái)定義服務(wù)和組件之
間的約定。
與當(dāng)前版本的 Microsoft Visual Studio?不同,C#接口可以在任何
語(yǔ)言中顯式定義,而不需要?jiǎng)?chuàng)建和編譯 IDL文件。C# IWebMessage接口的
定義如下:
public interface IWebMessage
{
WebMessageReturn Process(string sMessageLabel, string sMessage
Body, int iAppSpecific);
void Release();
}
ATL 代碼中的 Process 方法是為處理消息而指定的。Process 方法的返
回代碼定義為枚舉類型 WebMessageReturn:
public enum WebMessageReturn
{
ReturnGood,
ReturnBad,
ReturnAbort
}
枚舉的定義如下:Good表示繼續(xù)處理,Bad 表示將消息寫入錯(cuò)誤隊(duì)列,
Abort 表示終止處理。Release 方法為服務(wù)提供了輕松清除類實(shí)例的途徑。
因?yàn)閮H在垃圾回收的過(guò)程中才調(diào)用類實(shí)例的析構(gòu)函數(shù),所以確保所有占用
昂貴資源(例如數(shù)據(jù)庫(kù)連接)的類都有一個(gè)能夠在析構(gòu)之前被調(diào)用的方法,
用來(lái)釋放這些資源,這是一種非常好的構(gòu)思。
名稱空間
在這里先簡(jiǎn)單介紹一下名稱空間。名稱空間允許在內(nèi)部和外部表示中
將應(yīng)用程序組織成為邏輯元素。服務(wù)內(nèi)的所有代碼都包含在 MSDNMessage
Service.Service 名稱空間內(nèi)。盡管服務(wù)代碼包含在若干文件中,但是由
于它們包含在同一名稱空間中,因此用戶不需要引用其他文件。
由于 IWebMessage接口包含在 MSDNMessageService.Interface 名稱
空間中,因此使用此接口的線程類具有一個(gè)接口名稱空間。
服務(wù)類
應(yīng)用程序的目的是監(jiān)視和處理消息隊(duì)列,每一隊(duì)列在收到消息時(shí)都執(zhí)
行不同的進(jìn)程。應(yīng)用程序是作為 Windows服務(wù)來(lái)實(shí)現(xiàn)的。
ServiceBase 類
如前所述,服務(wù)的基本結(jié)構(gòu)是從 ServiceBase繼承的類。重要的方法
包括 OnStart、OnStop、OnPause 和 OnContinue ,每一個(gè)替代方法都與
一個(gè)服務(wù)控制操作直接對(duì)應(yīng)。OnStart 方法的目的是創(chuàng)建 CWorker對(duì)象,
而 CWorker類又創(chuàng)建 CWorkerThread對(duì)象,然后在該對(duì)象中創(chuàng)建執(zhí)行服務(wù)
工作的線程。
服務(wù)的運(yùn)行時(shí)配置(以及 CWorker和 CWorkerThread對(duì)象的屬性)是
在基于 XML的配置文件中維護(hù)的。它的名稱與創(chuàng)建的 .exe 文件相同,但
帶有一個(gè) .cfg 后綴。配置示例如下:
〈?xml version="1.0"?〉
〈configuration〉
〈ProcessList〉
〈ProcessDefinition
ProcessName="Worker1"
ProcessDesc="Message Worker with 2 Threads"
ProcessType="AppSpecific"
ProcessThreads="2"
InputQueue=".\private$\test_load1"
ErrorQueue=".\private$\test_error"〉
〈OutputList〉
〈OutputDefinition OutputName=".\private$\test_out11" /〉
〈OutputDefinition OutputName=".\private$\test_out12" /〉
〈/OutputList〉
〈/ProcessDefinition〉
〈ProcessDefinition
ProcessName="Worker2"
ProcessDesc="Assembly Worker with 1 Thread"
ProcessType="Assembly"
ProcessThreads="1"
InputQueue=".\private$\test_load2"
ErrorQueue=".\private$\test_error"〉
〈OutputList〉
〈OutputDefinition OutputName="C:\MSDNMessageService\Message
Example.dll" /〉
〈OutputDefinition OutputName="MSDNMessageService.Message
Sample.ExampleClass"/〉
〈/OutputList〉
〈/ProcessDefinition〉
〈/ProcessList〉
〈/configuration〉
對(duì)此信息的訪問(wèn)通過(guò)來(lái)自 System.Configuration 程序集的 Config
Manager 類來(lái)管理。靜態(tài) Get方法返回信息的集合,這些集合將被枚舉以
獲得單個(gè)屬性。這些屬性集的設(shè)置決定了輔助對(duì)象的運(yùn)行時(shí)特征。除了這
一配置文件,您還應(yīng)該創(chuàng)建定義 XML文件結(jié)構(gòu)的圖元文件,并在其中引用
位于服務(wù)器 machine.cfg配置文件中的圖元文件:
〈?xml version ="1.0"?〉
〈MetaData xmlns="x-schema:CatMeta.xms"〉
〈DatabaseMeta InternalName="MessageService"〉
〈ServerWiring Interceptor="Core_XMLInterceptor"/〉
〈Collection
InternalName="Process" PublicName="ProcessList"
PublicRowName="ProcessDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="ProcessDesc" Type="String" /〉
〈Property InternalName="ProcessType" Type="Int32" Default
Value="RoundRobin" 〉
〈Enum InternalName="RoundRobin" Value="0"/〉
〈Enum InternalName="AppSpecific" Value="1"/〉
〈Enum InternalName="Assembly" Value="2"/〉
〈/Property〉
〈Property InternalName="ProcessThreads" Type="Int32"
DefaultValue="1" /〉
〈Property InternalName="InputQueue" Type="String" /〉
〈Property InternalName="ErrorQueue" Type="String" /〉
〈Property InternalName="OutputName" Type="String" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL" /〉
〈/Collection〉
〈Collection
InternalName="Output" PublicName="OutputList"
PublicRowName="OutputDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="OutputName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL" /〉
〈/Collection〉
〈/DatabaseMeta〉
〈RelationMeta
PrimaryTable="Process" PrimaryColumns="ProcessName"
ForeignTable="Output" ForeignColumns="ProcessName"
MetaFlags="USECONTAINMENT"/〉
〈/MetaData〉
由于 Service類必須維護(hù)一個(gè)已創(chuàng)建輔助對(duì)象的列表,因此使用了
Hashtable 集合,用于保持類型對(duì)象的名稱/ 數(shù)值對(duì)列表。Hashtable 不
僅支持枚舉,還允許通過(guò)關(guān)鍵字來(lái)查詢值。在應(yīng)用程序中,XML 進(jìn)程名稱
是唯一的關(guān)鍵字:
private Hashtable htWorkers = new Hashtable();
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new
AppDomainSelector());
foreach (IConfigItem ciWorker in cWorkers)
{
WorkerFormatter sfWorker = new WorkerFormatter();
sfWorker.ProcessName = (string)ciWorker["ProcessName"];
sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];
sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];
sfWorker.InputQueue = (string)ciWorker["InputQueue"];
sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];
// 計(jì)算并定義進(jìn)程類型
switch ((int)ciWorker["ProcessType"])
{
case 0:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessRoundRobin;
break;
case 1:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAppSpecific;
break;
case 2:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAssembly;
break;
default:
throw new Exception("Unknown Processing Type");
}
// 執(zhí)行更多的工作以讀取輸出信息
string sProcessName = (string)ciWorker["ProcessName"];
if (htWorkers.ContainsKey(sProcessName))
throw new ArgumentException("Process Name Must be Unique: "
+ sProcessName);
htWorkers.Add(sProcessName, new CWorker(sfWorker));
}
在這段代碼中沒(méi)有包含的主要信息是輸出數(shù)據(jù)的獲取。每一個(gè)進(jìn)程定
義中都有一組相應(yīng)的輸出定義項(xiàng)。該信息是通過(guò)如下的簡(jiǎn)單查詢讀取的:
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +
sfWorker.ProcessName + " AND Selector=appdomain://";
ConfigQuery qQuery = new ConfigQuery(sQuery);
IConfigCollection cOutputs = ConfigManager.Get("OutputList",
qQuery);
int iSize = cOutputs.Count, iLoop = 0;
sfWorker.OutputName = new string[iSize];
foreach (IConfigItem ciOutput in cOutputs)
sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];
CWorkerThread 和 Cworker類都有相應(yīng)的服務(wù)控制方法,根據(jù)服務(wù)控
制操作進(jìn)行調(diào)用。由于 Hashtable中引用了每一個(gè) CWorker對(duì)象,因此需
要枚舉 Hashtable的內(nèi)容,以調(diào)用適當(dāng)?shù)姆⻊?wù)控制方法:
foreach (CWorker cWorker in htWorkers.Values)
cWorker.Start();
類似地,實(shí)現(xiàn)的 OnPause、OnContinue和 OnStop 方法是通過(guò)調(diào)用
CWorker 對(duì)象上的相應(yīng)方法來(lái)執(zhí)行操作的。
CWorker 類
CWorker 類的主要功能是創(chuàng)建和管理 CWorkerThread對(duì)象。Start 、
Stop、Pause 和 Continue 方法調(diào)用相應(yīng)的 CWorkerThread方法。實(shí)際的
CWorkerThread 對(duì)象是在Start 方法中創(chuàng)建的。與使用 Hashtable管理輔
助對(duì)象引用的 Service類相似,CWorker 使用 ArrayList(簡(jiǎn)單的動(dòng)態(tài)數(shù)
組)來(lái)維護(hù)線程對(duì)象的列表。