Интеграция приложений на основе WebSphere MQ

         

Группировка и сегментация сообщений


До настоящего момента говорилось о физических сообщениях WebSphere MQ, и этого было вполне достаточно. Обычные сообщения длиной до 4Мб и сообщения максимальной длины в 100Мбт покрывают потребности любых приложений на 95%. Необходимость использования сверхбольших сообщений (> 100Мбт) или ограничения в приложениях на длину сообщений приводят к необходимости использовать понятия физических и логических сообщений, механизмы группирования и сегментации сообщений.

Физическое сообщение - наименьший блок информации, который может быть размещен в очереди. Одно физическое сообщение принято называть сегментом.

Логическое сообщение это либо отдельный блок информации приложения, либо один или несколько сегментов.

Группа сообщений это одно или несколько логических сообщений. Сегментация сообщений обычно используется, когда требуется работа с сообщениями, которые слишком большие для менеджера, очереди, либо приложения. Группы сообщений,как правило, применяются в следующих случаях:

  • требуется гарантировать упорядочение при поиске и при этом избежать использования MsgId и CorrelId;
  • требуется обеспечить объединение в группу сообщений, которые должны быть обработаны специфическим программным модулем и без использования MsgId и CorrelId.

Пусть необходимо объединить в группу три сообщения. Для этого в дескрипторе сообщений полю MQMD_GroupId (24 байта) нужно присвоить уникальный идентификатор и записать его в каждом сообщении. Следующему полю MQMD_MsgSeqNumber присвоить значения 1, 2 и 3 для первого, второго и третьего сообщений, соответственно, как показано это на рис.11.1. Если сообщение не включено в группу, то для него GroupId будет NULL (MQGI_NONE), а в MsgSeqNumber будет значение 1. WebSphere MQ может автоматизировать присвоение MsgSeqNumber во время выполнения функции MQPUT, если в PMO задана опция MQPMO_LOGICAL_ORDER. В этом случае также понадобится установить значение в новое поле дескриптора сообщений: MQMD.MsgFlags, которое может иметь значения MQMF_MSG_IN_GROUP и MQMF_LAST_MSG_IN_GROUP. Если в GroupId было установлено значение MQGI_NONE, то для GroupId будет сгенерирован новый уникальный идентификатор при использовании MQPMO_LOGICAL_ORDER. И еще одно правило для вывода группы сообщений: свойства сообщений PERSISTENCE и NOT_PERSISTENCE не должны перемешиваться в одной группе.


Для пояснения изложенного следует привести фрагмент программы, использующей вывод сообщений в виде группы.

if ((CompCode == MQCC_OK) || (CompCode == MQCC_WARNING)) { pmo.Options = MQPMO_LOGICAL_ORDER + MQPMO_SYNCPOINT ; memcpy(md.GroupId, "1", sizeof(md.GroupId)); memcpy(md. MsgFlags, "MQMF_MSG_IN_GROUP", sizeof(md. MsgFlags));

//memcpy(md.MsgSeqNumber, 1, // sizeof(md.MsgSeqNumber)); //memcpy(md.MsgFlags, MQMF_SEGMENT, // sizeof(md.MsgFlags));

/* блок формирования buffer */ MQPUT1(Hcon, &odR, &md, &pmo, messlen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) printf("MQPUT1 ended with reason code %ld\n", Reason); } /* конец цикла чтения поступающих сообщений */

memcpy(md. MsgFlags, "MQMF_ LAST_MSG_IN_GROUP", sizeof(md. MsgFlags));

/* блок формирования buffer */ MQPUT1(Hcon, &odR, &md, &pmo, messlen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) printf("MQPUT1 ended with reason code %ld\n", Reason);


Рис. 11.1.  Группа сообщений и сегментация сообщения

Процесс извлечения группы логических сообщений из очереди командой MQGET во многом будет зависеть от значений, заданных в опции GMO. Если задать значение MQGMO_ALL_MSGS_AVAILABLE, то сообщения будут доступны для чтения только, когда все сообщения группы поступят в очередь. Если задана опция MQGMO_LOGICAL_ORDER, то сообщения можно извлекать в порядке, определяемом MsgSeqNumber и только по одному сообщению из группы. Если опция MQGMO_LOGICAL_ORDER не задана, то приложение должно выбирать сообщения двумя способами:

  1. на основе корректных GroupId и MsgSeqNumber, либо переключаться на MQGMO_LOGICAL_ORDER, либо переслать сообщения дальше.
  2. на основе опций соответствия MatchOption, если они заданы:
    • MQMO_MATCH_MSG_ID,
    • MQMO_MATCH_CORREL_ID,
    • MQMO_MATCH_GROUP_ID,
    • MQMO_MATCH_MSG_SEQ_NUMBER.


    Например, опция CORREL_ID означает, что должно выбираться сообщение, корреляционный идентификатор которого, соответствует значению поля CorrelId параметра MsgDesc функции MQGET. Это согласование является дополнением к любому другому соответствию, заданному в MatchOption. Если корреляционный идентификатор есть MQCI_NONE, то это равносильно тому, что опция MQMO_MATCH_CORREL_ID не определена.



    Теперь следует дать расширенное определение сегмента сообщения. Сегментом сообщения называется физическое сообщение, идентифицируемое тройкой: GroupId, MsgSeqNumber и Offset. Из лекции 8 известно, что Offset это поле формата MQLONG в структурах MQOD и MQMD. Таким образом, одно логическое сообщение может состоять из нескольких сегментов, как показано на рис.11.1.

    Разбиение на сегменты (сегментация) и обратная процедура, называемая реассемблированием (reassembly) может осуществляться менеджером очередей или приложением. Для начала следует рассмотреть, как это осуществляет менеджер очередей. Если дескриптор сообщения имеет в поле MsgFlag значение MQMF_SEGMENTATION_ALLOWED, и менеджер очередей определил, что сообщение является слишком большим для его MaxMsgLength или для значения параметра MaxMsgLength для очереди, то тогда менеджер осуществляет сегментацию. Никаких предположений о том, как выполняется разбиение данных, сделано быть не может.

    Реассемблирование с помощью менеджера осуществляется, если при чтении сообщений командой MQGET была задана GMO-опция: MQGMO_COMPLETE_MESSAGE. В этом случае извлекаются все сегменты, и только полное логическое сообщение помещается в программный буфер. Следует быть внимательным при задании преобразования данных в sender-канале. Поскольку не все сегменты приходят одновременно, преобразование данных в канале в связи с разными кодовыми страницами может привести к ошибкам и рекомендуется задавать предобработку (UOW - unit of work) с помощью менеджера (опция MQRC_UOW_AVAILABLE) или приложения (опция MQRC_UOW_NOT_AVAILABLE).

    Сегментация с помощью приложения возможна в двух случаях:

    • Сегментация с помощью менеджера не решает проблему из-за того, что буфер приложения недостаточно большой для обработки полного сообщения;
    • Сегментация с помощью менеджера не может быть успешной из-за специфических границ сообщения и необходимости преобразование данных, которое должно осуществляться в канале.


    Для сегментации с помощью приложения необходимо включить для функции MQPUT опцию MQPMO_LOGICAL_ORDER и проследить за правильным включением в поле MsgFlags флажков MQMF_SEGMENT и MQMF_LAST_SEGMENT. Менеджер очередей позаботиться о назначении и поддержании GroupId, MsgSeqNumber и Offset.



    Реассемблирование с помощью приложения осуществляется, если при чтении сообщений командой MQGET была задана GMO-опция: MQGMO_COMPLETE_MESSAGE и флажок MsgFlags имеет значение MQMF_SEGMENTATION_ALLOWED. В этом случае приложение извлекает сегменты самостоятельно в зависимости от ограничений на свой размер буфера. Опции MQGMO_ALL_MSGS_AVAILABLE и MQGMO_ALL_SEGMENTS_AVAILABLE позволят начать работу, если все сообщения или все сегменты поступят в очередь. Как только первое сообщение поступит в очередь, можно использовать MQGMO_LOGICAL_ORDER опцию, чтобы быть уверенным, что все последующие сегменты логического сообщения будут обработаны. Если опция MQGMO_LOGICAL_ORDER не задана, то приложение может выбирать сообщения на основе опций соответствия MatchOption, как это делалось для групп:

    • MQMO_MATCH_MSG_ID,
    • MQMO_MATCH_CORREL_ID,
    • MQMO_MATCH_GROUP_ID,
    • MQMO_MATCH_MSG_SEQ_NUMBER,
    • MQMO_MATCH_OFFSET.


    Приложение может определить окончание работы с сегментом с помощью возвращаемого MQGET значения поля MQGMO.SegmentStatus, которое может принимать следующие значения: MQSS_SEGMENT, MQSS_LAST_SEGMENT, MQSS_NOT_A_SEGMENT.

    В завершение раздела можно привести примеры использования функций MQPUT и MQGET для вывода и чтения логических сообщений в соответствии с их структурой, указанной на рис.11.1.

    /* пример использования функции MQPUT */ PMO.Options = MQGMO_LOGICAL_ORDER + MQGMO_SYNCPOINT

    /* помещаем в очередь первое логическое сообщение*/ MQPUT (MQMF_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_MSG_IN GROUP + MQMF_LAST_SEGMENT)

    /* помещаем в очередь второе логическое сообщение */ MQPUT (MQMF_MSG_IN GROUP + MQMFSEGMENT)

    /* помещаем в очередь третье логическое сообщение */ MQPUT (MQMF_LAST_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_ LAST_MSG_IN GROUP + MQMF_LAST_SEGMENT) MQCMIT

    /* пример использования функции MQGET */ GMO.Options = MQGMO_LOGICAL_ORDER + MQGMO_SYNCPOINT + MQGMO_ALL_MSGS_AVAILABLE + MQGMO_WAIT Do while (Not MQGS_LAST_MSG_IN GROUP and Not MQSS_LAST_SEGMENT) MQGET (........) /* обработка сегмента или законченного логического сообщения */ End MQCMIT


    Содержание раздела