Интеграция приложений на основе WebSphere MQ

         

Программирование транзакций


Сообщения WebSphere MQ могут быть четырех типов:

  • Datagram - простое сообщение, не требующее ответа;
  • Request - сообщение-запрос, которое ожидает сообщение-ответ (reply message);
  • Reply - сообщение-ответ на сообщение-запрос;
  • Report - сообщение, которое описывает такое событие, как появление ошибки.

Наша очередная задача: на сервере 1 прочитать сообщение из входной очереди, положить её в очередь для отправки на сервер 2 как сообщение-запрос и дождаться прихода сообщения-ответа, как это показано на рис.9.2. Все это необходимо оформить в виде транзакции, для которой будет осуществляться откат в случае неполучения сообщения-ответа в течении 10 сек. Эта задача может использоваться в практических целях при нестабильной работе каналов, например выделенных. Наше приложение при откате транзакции может попытаться перенаправить сообщений из входной очереди – но это уже другая задача.


Рис. 9.2.  Структура объектов WebSphere MQ

Итак, последовательность псевдокода представляется следующим образом (обратите внимание на блок 5 и опции MQMD):

Блок 1 MQCONN Блок 2 MQOPEN Блок 3 MQBEGIN Блок 4 MQGET (Input_queue) Блок 5 MQPUT (Output_queue, MQMD.MsgType = MQMT_REQUEST, MQMD.ReplyToQ = Reply_queue) Блок 6 MQGET (Reply_queue) Блок 7 If Reply time < 10 sec then MQCMIT else MQBACK; Блок 8 MQCLOSE Блок 9 MQDISC

Назовем нашу программу transmit.exe и файл инициализации transmit.ini, в котором 1-я строка – имя очереди для чтения, 2-я строка – имя очереди для записи, 3-я строка – имя очереди для ответа, 4-я строка – время ожидания ответа Reply_time = 3000мсек, как показано ниже.

QUEUE_INPUT QUEUE_OUTPUT QUEUE_REPLY 3000

Тип очереди Output_queue – remote queue и эта очередь настроена для отправки сообщений на сервер 2. На сервере 2 также выполнены соответствующие настройки и при нормальной работе каналов транзакция будет совершаться успешно. Отметим также, что сообщение-ответ формируется на сервере 2 средствами другого приложения на этом сервере. В случае остановки любого канала, которую мы произведем для отладки программы, будет происходить откат транзакции. В данной версии в начале программы производится извлечение параметров из ini-файла. Такую программу полезно также иметь в виде триггера и читателю предлагается самостоятельно модифицировать программу для считывания параметров триггера из очереди, на которую он навешивается.


Ниже приводится листинг программы transmit.cpp для Microsoft Visual C++ ver.6.0. Для каждого сообщения MsgId и the CorrelId создаются как уникальные (MSGID= MQMI_NONE и CORRELID= MQCI_NONE) и об этом подробнее в лекции 11.

Листинг 9.2. Программа transmit.cpp для Microsoft Visual C++ ver.6.0. (html, txt)

По тексту программы следует дать комментарии. Наличие опции

gmo.Options = MQGMO_SYNCPOINT;

подразумевает, что команда MQBEGIN может не указываться. Операторы

md.MsgType = MQMT_REQUEST; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH);



определяют тип сообщения REQUEST и очередь ответа, заданную в QUEUE_REPLY.

На очередь QUEUE_OUTPUT (или на удаленную очередь на другом менеджере) должна быть навешена программа-триггер, который возвращает сообщения типа Reply. Если Reply-сообщение поступает в очередь QUEUE_REPLY, то транзакция завершается успешно, в противном случае производится откат транзакции и сообщение восстанавливается в очереди QUEUE_INPUT. Reply-сообщение должно иметь идентификатор CorrelId такой же, как и MsgId исходного сообщения. В данной версии программы в целях упрощения отладки не проверяется это условие и читателю предлагается самостоятельно дописать этот фрагмент кода после отладки текущей версии программы. Работа с MsgId и CorrelId будет рассмотрена подробнее в лекции 11.

Программу-триггер, которая "навешивается" на очередь QUEUE_OUTPUT (или на удаленную очередь) для формирования Reply-сообщения (md.MsgType = MQMT_REPLY;), читателю также предлагается сделать самостоятельно.

На данном примере мы познакомились с WebSphere MQ транзакциями, являющимися основой создания надежных программ для передачи сообщений. Если сообщение приходит на сервер в очередь, то программа опроса очереди открывает внешнюю транзакцию для работы с WebSphere MQ и передает управление подпрограмме записи сообщения в базу данных, которая открывает внутреннюю транзакцию для работы с базой данных (БД). Если сообщение уходит из базы данных, то открывается внешняя транзакция работы с БД, далее открывается внутренняя транзакцию для работы с WebSphere MQ и идет помещение сообщения в очередь, из которой это сообщение "улетает" на другой сервер. Завершение транзакций и откат транзакций обоих типов осуществляется взаимосвязанно. Это и есть правильный стиль интеграции приложений на основе WebSphere MQ.



O_options = MQOO_BROWSE + MQOO_INPUT_SHARED ; MQOPEN(Hcon, &odG, O_options, &Hobj, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN % s ended with reason code %ld\n", queue_input, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }

O_options = MQOO_BROWSE + MQOO_INPUT_SHARED ; MQOPEN(Hcon, &odR, O_options, &Hrep, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN %s ended with reason code %ld\n", queue_reply, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }

O_options = MQOO_OUTPUT ; MQOPEN(Hcon, &odP, O_options, &Hout, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN %s ended with reason code %ld\n", queue_output, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }

while (CompCode == MQCC_OK) { buflen = sizeof(buffer) - 1; memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG + MQGMO_WAIT + MQGMO_SYNCPOINT; gmo.WaitInterval = 3000 ;

MQBEGIN (Hcon, &mbo, &CompCode, &Reason); MQGET(Hcon, Hobj, &md, &gmo, buflen, buffer, &messlen, &CompCode, &Reason); //if (Reason != MQRC_NONE) { printf("MQGET from %s ended with reason code %ld\n", queue_input, Reason); }

if ((CompCode == MQCC_OK) || (CompCode == MQCC_WARNING)) { buffer[messlen] = '\0'; /* заносим символ конец строки в буфер с прочитанным сообщением */ buflen = messlen; md.MsgType = MQMT_REQUEST; md.Report = MQRO_EXCEPTION_WITH_DATA; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH); memcpy(md.Format, MQFMT_STRING, MQ_FORMAT_LENGTH);

MQPUT(Hcon, Hout, &md, &pmo, buflen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQPUT to %s ended ended unsuccessfully with reason code %ld CompCode %ld\n", queue_output, Reason, CompCode ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } else {

while (CompCode != MQCC_FAILED) { /** осуществляется проверка queue_reply **/ gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG + MQGMO_WAIT ; gmo.WaitInterval = 3000 ; memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); MQGET(Hcon, Hrep, &md, &gmo, buflen, buffer, &replylen, &CompCode, &Reason); if (CompCode != MQCC_FAILED) { if (md.MsgType == MQMT_REPLY) /* report feedback */ { printf("Transaction % s=> %s successfully: %s\n", queue_input, queue_output, buffer); MQCMIT( Hcon, &CompCode, &Reason ) ; } else { printf("Transaction % s=> %s successfully, REPLY message not deliver, reason code %ld CompCode %ld\n", queue_input, queue_output, queue_reply, Reason, CompCode ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } }



if (Reason == MQRC_NO_MSG_AVAILABLE) { printf("Transaction % s=> % s UNsuccessfully, REPLY message not deliver\n", queue_input, queue_output ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } } } } } C_options = 0; MQCLOSE(Hcon, &Hobj, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_input, Reason); } MQCLOSE(Hcon, &Hout, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_output, Reason); } MQCLOSE(Hcon, &Hrep, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_reply, Reason); }

MQDISC(&Hcon, &CompCode, &Reason); if (Reason != MQRC_NONE){ printf("MQDISC ended with reason code %ld\n", Reason); } return(0); }

Листинг 9.2. Программа transmit.cpp для Microsoft Visual C++ ver.6.0.

По тексту программы следует дать комментарии. Наличие опции

gmo.Options = MQGMO_SYNCPOINT;

подразумевает, что команда MQBEGIN может не указываться. Операторы

md.MsgType = MQMT_REQUEST; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH);

определяют тип сообщения REQUEST и очередь ответа, заданную в QUEUE_REPLY.

На очередь QUEUE_OUTPUT (или на удаленную очередь на другом менеджере) должна быть навешена программа-триггер, который возвращает сообщения типа Reply. Если Reply-сообщение поступает в очередь QUEUE_REPLY, то транзакция завершается успешно, в противном случае производится откат транзакции и сообщение восстанавливается в очереди QUEUE_INPUT. Reply-сообщение должно иметь идентификатор CorrelId такой же, как и MsgId исходного сообщения. В данной версии программы в целях упрощения отладки не проверяется это условие и читателю предлагается самостоятельно дописать этот фрагмент кода после отладки текущей версии программы. Работа с MsgId и CorrelId будет рассмотрена подробнее в лекции 11.



Программу-триггер, которая "навешивается" на очередь QUEUE_OUTPUT ( или на удаленную очередь) для формирования Reply-сообщения (md.MsgType = MQMT_REPLY;), читателю также предлагается сделать самостоятельно.

На данном примере мы познакомились с WebSphere MQ транзакциями, являющимися основой создания надежных программ для передачи сообщений. Если сообщение приходит на сервер в очередь, то программа опроса очереди открывает внешнюю транзакцию для работы с WebSphere MQ и передает управление подпрограмме записи сообщения в базу данных, которая открывает внутреннюю транзакцию для работы с базой данных (БД). Если сообщение уходит из базы данных, то открывается внешняя транзакция работы с БД, далее открывается внутренняя транзакцию для работы с WebSphere MQ и идет помещение сообщения в очередь, из которой это сообщение "улетает" на другой сервер. Завершение транзакций и откат транзакций обоих типов осуществляется взаимосвязанно. Это и есть правильный стиль интеграции приложений на основе WebSphere MQ.


Содержание раздела