Безопасная посылка данных процессам через Interproc-Post

  !   Данная информация предназначена только только для IT-специалистов по системной интеграции модулей БИОСОФТ-М. (см. Руководства пользователя к программным продуктам)

Interproc-Post интерфейс предназначен для посылки сообщений и данных между параллельными процессами.

Главная особенность механизма - асинхронность в духе событийной управляемости. Посылаемые Post-события размещаются в очереди сообщений принимающего процесса (FIFO) и отрабатываются вместе с остальными сообщениями от системы.

В отличие от синхронной коммуникации в данном случае посылающий процесс не ждет завершения вызова, не блокируется и продолжает работать дальше. Очевидная особенность этого - невозможность сразу же получить ответ от получателей сообщения.

Каждое сообщение Interproc-Post представлено классом в С++ и каждый канал рассылки сообщения идентифицируется уникальным текстовым идентификатором - общим для всех процессов.

Каждая транзакция посылки сообщения инициируется посылателем в одном из процессов и принимается любым количеством хандлеров в других процессах.

Параметрами сообщения является обычный объект, публичные проперти и приватные данные которого экспозируются в рамках посылающего процесса и восстанавливаются в процессах получателях.

С++ Интерфейс

Для данного сервиса Interproc публикует следующие классы и методы:

CInterprocPostIfaceGp- базовый класс для всех классов параметров сообщенияего метод rMyPost->PostBroadcast("Id"): отсылает пакетonpost<>: template<> инкапсулирующее получатель сообщенияего метод _m_onpostMyHandler->OpenPostOffice("Id", this): включает прием пакетовколбак функция void CMyClient::HandleMyPost(ref<CTestInterPostPackage>) предоставляется приложением и вызывается библиотекой.

Общий принцип применения

Приложение

наследуетCInterprocPostIfaceGpи создает свой класс с данными сообщения.Выбирает уникальный идентификатор класса/канала сообщения (обычно совпадает с именем класса данных).Посылатель подготавливает объект с данными и посылает его методом PostBroadcast().На принимающей стороне(нах) во всех объектах желающих получать сообщения имеется onpost<> объект, инициализированный на приемку сообщений определенного класса.

При получении сообщения onpost<> вызывает указанную ему функцию-хандлер которая и выполняет все необходимые приложению действия по реакции на сообщение.

Краткий псевдокод декларации посылки, ее отсылки и получения:

// Derive specific message parameters from the package base class
class CMyXxxxPost : public CInterprocPostIfaceGp
{
public:
 
    C_sMyXxxxPostId = "MyXxxxPost";
 
    xxxx x_xMyData = ...;
};
 
void Sender()
{
    // Prepare data to be sent
    ref<CMyXxxxPost> rMyXxxxPost;
    rMyXxxxPost->x_xMyData = ..........;
 
    // Post the data into all initialized receiver processes FIFO
    rMyXxxxPost->PostBroadcast(C_sMyXxxxPostId);
}
 
class CReceiverClient : public object
{
...
 
private:
 
    // This Post handler must adhere to PFN_Handler signature in onpost template<>
    void HandleMyXxxxPost(
            ref<CMyXxxxPost> rMyXxxxPost);
    onpost<HandleMyXxxxPost> _m_onpostMyXxxxPost;
 
    // We can have many handlers for different Post package
    //   classes in a single client class
    void MyYyyyyyyyPost.....
            ref<CMyYyyyyyyyPost>....
    onpost<MyYyyyyyyyPost> _m_....
 
};
 
CReceiverClient::Init()
{
    // Initialize the handler and link back to our object
    _m_onpostMyXxxxPost.OpenPostOffice(C_sMyXxxxPostId, this);
}
 
void CReceiverClient::HandleMyXxxxPost(
        ref<CMyXxxxPost> rMyXxxxPost)
{
    // we have received a copy of the package here
    TESTLINE("", "We have received a My Xxxx Post: " + rMyXxxxPost->x_xMyData);
    ....
}
Подробная последовательность применения
Подготовка процессов

Организуем ситуацию когда два (или более) наших процесса запущены параллельно, проинициализированы, загрузили все модули c необходимыми классами и готовы к обработке сообщений.

Кстати сигнал о готовности запущенный процесс может послать остальным как раз через специально предусмотренный Interproc-Post.

В частном и самом простом случае один и тот же .exe может запускаться как два процесса и общаться сам с собой - это самый простой способ коммуникации. В любом случае модуль DLL, реализующий посылку и прием данного сообщения должен быть загружен в рамках всех общающихся процессов.

(Пример: Paradop.dll запускается как StartParadop.exe одновременно с несколькими доплерными приложениями, подзагружающими Paradop.dll. Клиентские приложения ничего не ведают о том как идет общение Paradop.dll <--> Paradop.dll через границу процессов, так как Paradop.dll все инкапсулирует)

Декларация пакета и отсылка.

1.1) Создаем приватный класс для хранения посылаемых данных (и возможно команд). Этот класс обязательно наследуется от CInterprocPostIfaceGp.

Все данные должны содержаться в пропертях или приватных мемберах так чтобы обеспечивалась обычная экспозиция. Ничем не отличается от объектов, которые мы сохраняем и грузим из файлов.

Имя класса обычно строим по принципу CПрефикспроектаPostXxxxx

Пример: для отсылки состояния прибора

class CParadopPostDeviceState : public CInterprocPostIfaceGp
{
...
    // Pedal #0
    bool x_bPedalPressed = false,
            auto(Get, Set);

1.2) Рекомендуется иметь в проекте пакет, занимающийся всей межпроцессной коммуникацией приватно, принимающий вызовы и выдающий колбаки наружу в виде обычных Iface/IfaceGp интерфейсов. (см. Paradop/Intercom)

1.3) Каждый тип поста идентифицируется статической текстовой строкой. Она обязательно должна быть глобально уникальной. Обычно она следует из названия класса пакета с данными. Например:

class CParadopPostDeviceState
{
 
    static const str C_sParadopPostDeviceStateId = "ParadopPostDeviceState";

(см. примечания по вопросу локализации посылок отдельным объектам добавками к этому строковому ID)

1.4) Собственно отсылаем данные в момент когда это нужно сделать. Для этого достаточно создать локальный объект с данными, заполнить проперти и вызвать PostBroadcast():

        ref<CParadopPostDeviceState> rParadopPostDeviceState;
 
        rParadopPostDeviceState->x_bPedalPressed = bPressed;
 
        rParadopPostDeviceState->
            PostBroadcast(
                CParadopPostDeviceState::C_sParadopPostDeviceStateId);

Вызов этот естественно инкапсулирован в объекте, отвечающем за коммуникацию а не разбросан copy/paste повсюду!

void CParadopIntercomDeviceImpl::OnBroadcastDeviceState(
        int iPedal,
        bool bPressed)
{
    if (iPedal == C_iMainPedal)
    {
        ref<CParadopPostDeviceState> rParadopPostDeviceState;
 
        rParadopPostDeviceState->x_bPedalPressed = bPressed;
 
        rParadopPostDeviceState->
            PostBroadcast(
                CParadopPostDeviceState::C_sParadopPostDeviceStateId);
    }
}

После отсылки нам тут больше делать нечего. Узнать о результатах немедленно мы не можем. (См. соотвествующий раздел о том как организовать пересылку результатов обработки сообщения обратно с помощью еще одного Post) Сообщения получат все получатели зарегистрированные на момент отсылки.

2) Прием Post сообщений

2.1) Обычно в том же пакете, который отсылает посты имеем класс который их и получает. Технически это может быть тот же самый коммуникационный класс. (Если плевать на структурность, где то в тесте/прототипе, то можно налепить и посылатель и приемник в один класс вместе с декларацией данных) Но чтобы избежать путаницы, рекомендуется отдельный класс приемника. В нем в private :!: объявляем хандлер сообщения в следующей форме:

class CParadopIntercomClientImpl : public CParadopIntercomClientIface
{
...
private:
 
    // This Post handler must adhere to PFN_Handler signature in onpost template<>
    void HandleParadopPostDeviceState(
            ref<CParadopPostDeviceState> rParadopPostDeviceState);
    onpost<HandleParadopPostDeviceState> _m_onpostParadopPostDeviceState;

Здесь мы декларировали объект onpost<> - приемник сообщения и указали ему (безопасный) адрес функции которую надо вызывать для обработки сообщения.

2.2) Инициализируем объект приемника через onpost<>::OpenPostOffice() указав тот же строковый ID что и при посылке:

void CParadopIntercomClientImpl::OnInitParadopIntercomClient(
        ref<CParadopClientSessionIface> rParadopClientSession)
{
    _m_onpostParadopPostDeviceState.
        OpenPostOffice(
            CParadopPostDeviceState::C_sParadopPostDeviceStateId,
            this);
}

после выполнения этого и возврата управления системе для обработки сообщений может начать вызываться хандлер:

void CParadopIntercomClientImpl::HandleParadopPostDeviceState(
        ref<CParadopPostDeviceState> rParadopPostDeviceState)
{
    if (rParadopPostDeviceState->x_bPedalPressed)
    {
        m_rPedalReceiver->HandlePedalPressed();
    }
}

Все, готово. Остается убедится что объект, содержащий хандлер и переданный как второй параметр OpenPostOffice(..., this) будет сущестовать все время пока необходима обработка данного события. НЕ рекомедуется оперативно создавать и удалять хандлерные объекты. В отличие от временного объекта при отсылке, приемник должен существовать постоянно для стабильной работы всей Post системы.

Unit Tests

Посылку можно вызывать из class-test и она не будет иметь глобальных эффектов. Хандлер приемника для тестирования тест должен вызывать вручную, проверяя все возможные моменты времени прихода сообщений и их количество. Автоматический вызов хандлера сразу после PostBroadcast() не эмулируются за нереалистичностью идеалистисного сценария когда сообщение принимается сразу после посылки.

Инкапсуляция Interproc-Post

Не только прием но и посылка сообщений должна быть глубоко инкапсулирована в приватных классах модуля (DLL). Абсолютно неприемлемо чтобы один модуль публиковал классы пакетов унаследнованные CInterprocPostIfaceGp и текстовые идентификаторы каналов посылки чтобы другие модули их принимали.

Правильный дизайн когда один приватный класс пакета в модуле посылает сообщения другому приватному классу в том же пакете того же модуля загруженному в рамках другого процесса. Если реальный прикладной источник и получатель сообщения находятся в других пакетах и модулях то для них должны быть предоставлены обычные Iface и колбаки.

Идеально видимо будет всегда выделять отдельный пакет в проекте который приватно занимается межпроцессными пересылками и все остальное приложение работает не зная ничего о том как происходит коммуникация между его параллельно сосуществующими объектами.

Данные, пересылаемые с сообщением

Класс, унаследованный от CInterprocPostIfaceGp должен быть построен так чтобы обычная экспозиция могла восстановить все данные посылки. Кроме простых типов проперти (int, str, ...) которые восстановимы всегда без проблем можно использовать и сложные полиморфные объекты и их иерархии. Естественно при этом нужно позаботится чтобы все получающие процессы загрузили все необходимые модули для восстановления объектов этих классов дотого как они получат первую послылку.

Учитывая возможность полиморфного восстановления объектов и использования type<> есть возможность в рамках одного класса посылок Interproc-Post реально передавать разные сообщения, реакцию на которые принимающий процесс будет выбирать вызовом виртуальной функции. На практике это требует аккуратной инкапсуляции посылателя и получателя сообщения так чтобы они находились в одном модуле (DLL).

Поддержка совместимости формата между разными версиями программ не очень критична. Обычно мы не провоцируем пользователя на параллельный запуск наших процессов разных версий.

Тонкая идентификация получателей.

Interproc-Post не имеет объектно-ориентированной индентификации объектов-получателей. Текстовый идентификатор принципиально глобальный. Локализация получателей может происходить одним из двух способов:

a.) Динамическими добавками к строковому идентификатору, например:      "DopplerStatusForChannel" + "_" + Str(iChannel) (наличие "_" критично!)

b.) Указанием локализующих данных в объекте сообщения:

class CMyDopplerStatusPost : public CInterprocPostIfaceGp
{
    int x_iForDopplerChannel = ...;
 
    ...
};

и соответствующей фильтрацией (если все объекты каналов имеют хандлер) или диспетчеризацией (если только один центральный хандлер, передающий вызов в требуемый канал) в обработчике.

Событийно управляемая архитектура против "MS DOS"

Нам всем рассказывали байки про то как ОС _____ мол "событийно управляемая" и все мол "на сообщениях". Но для того чтобы прочувствовать разницу между событийным кодом и тупым последовательным алгоритмом рассмотрим до боли знакомые способы создания плавающих и так называемых "модальных" окон.

Что событийно-управляемое приложение делает чтобы ввести данные от пользователя в отдельном окне W?

Оно делает так:

         (new W(rDoc))->CreateWindow;
 
         return;  // return control to the system

Разумеется оно ожидает колбаки в виде отдельных сообщений:

         W::OnDataChanged(data)
         {
             rDoc->data = data;
 
             return;  // return control to the system
         }
 
         W::OnOk()
         {
             CloseWindow;
             rDoc->Update;
 
             return;  // return control to the system
         }

Теперь посмотрим что бывает если ламеры не понимающие этой идеи начинают проектировать эдакий "событийно-управляемый" фреймворк. Рожаются следующие досадные примеры:

        CMyDlg dlg;
        dlg.m_strLeft = strLeft;
        dlg.m_strRight = strRight;
        dlg.m_bRecurse = bRecurse;
 
        if (dlg.DoModal() != IDOK)
            return FALSE;
 
        strLeft = dlg.m_strLeft;
        strRight = dlg.m_strRight;
        bRecurse = dlg.m_bRecurse;
        infoUnpacker = dlg.m_infoHandler;
 private void xxx_Click ( object sender, System.EventArgs e )  
 {  
      InputDialog MyInputDialog = new InputDialog();
      MyInputDialog.MyInput.Text = "abcd";
 
      MyInputDialog.ShowDialog();
 
      string s = MyInputDialog.MyInput.Text;
      .....

Так вот, по опыту UniversalView вы знаете что можно программировать все без блокировки приложения в обработчике сообщения, без принудительной прокачки сообщений в ожидании пока закроется ваш ДОСовский диалог (DoModal=ShowDialog), просто (1) создавая окно и потом (2) реагируя на хандлеры. Диалог ничего не "должен возвращать" в приложение.

Точно также асинхронная коммуникация может быть ПОЛНОЦЕННО реализована без ожидания когда обработчик сообщения(=метода) завершится.

Достаточно послать запрос и отпустить приложение работать дальше. Ответ придет обратно отдельным сообщением.

См. подробный пример в Interproc для соединения процессов ЭЭГ к Monitex

Теперь где аналогия с диалогом заканчивается: Создавая окно вы можете быть уверены что система покажет его, вам не нужно верифицировать этот факт. Коммуникация же между процессами вещь менее детерминированная.

Если, например, ваше приложение не может начать работать не узнав возраст пациента из БД, вы послали в БД Post запрос и... ждете. В этой ситуации может потребоваться создать некоторое плавающее окно, или заглушку заместо всего главного окна. Написать там "Запрос данных из БД...", и добавить кнопку [Abort]. Так же не плохо засекать по таймеру и если ответа нет за критический таймаут спрашивать у юзера в чем дело? прерваться? ждать дальше? Естественно тут тоже нельзя деалать бесконечных (или таймаутных) циклов ожидания, не нужно "прокачек сообщений" и "DoModal". Управление нужно вернуть системе. И к ожидаемым событиям добавить OnTimerTimeout.

Предостережения

1) Запрещено применять Interproc-Post в релизных проектах для намеренной пересылки данных в рамках одного процесса, злоупотребляя им как эдаким универсальным механизмом уведомлений о событиях

2) Обязательно инкапсулировать Interproc-Post, оборачивать его в обычные Iface и колбаки а не публиковать эдакий "дополнительный" интерфейс пакета/модуля мол "примите сообщение Xxxx в объект CXxxxIface". (см. раздел "Инкапсуляция Interproc-Post")

3) Interproc-Post как основанный экспозиции, FIFO и отложенной обработке сообщений принципиально более медленный коммуникационный интерфейс чем низкоуровневая (опасная) передача через разделяемую память. Interproc предоставляет оптимизированные средства коммуникации через разделяемую память но они более опасны в применении - см. Разделяемая процессами память в Interproc

4) Если источник шлет пакеты постоянно быстрее чем приемник будет их обрабатывать приемник может практически повиснуть утонув в сообщениях. Interproc-Post предназначен для разовых событий а не для передачи непрерывных потоков данных (сигналов в режиме реального времени)

5) Освобождение памяти, занятой пакетами происходит сборщиком мустора по Idle. Пока процессы перегружены сообщениями пакеты освобождаться не будут.

6) На низком уровне это все основано на CInterCall интефейсе, который в частности используется и PexContact и UV. Не надо туда лазить, он более опасный сложный, и концептуально принципиально иной нежели описанный тут высокоуровневый.

7) В интефейсе Interproc-Post очевидно в одном классе объединен наследуемый приложением объект с данными посылки и объект отсылателя. Посылка как бы отсылает сама себя. Сделано для кратности документации, не пример для подражания в приложениях, где проще и яснее в таких случаях разделять переопределяемый класс с данными и объект, манипулирующий ими. На приемном конце onpost<> использует трюк с избавлением от деклараций отдельного класса для каждого колбака и дает возможность одному клиенскому классу принимать колбаки посылок разных типов. Ничего подобного не доступно для приложений, которые реализуют колбаки как обычно. В том числе и колбаки из обязательного инкапсулятора Interproc-Post.

BUGS/Todo

Если процесс завершился или фатально повиснет перед тем как успеет отработать уже посланный пакет, этот пакет никогда не будет освобожден сборщиком мусора.