Параллельные процессы в шаблоне нового модуля

  !   Данная информация предназначена только только для IT-специалистов по системной интеграции модулей БИОСОФТ-М. (см. Руководства пользователя к программным продуктам)

Теперь создавать безопасные параллельные процессы стало совсем просто. В Absex заготовлен пример функции, код которой можно выполнять параллельно:

void CAbsexParaApiInproc::RunParallelTasks()
{
    вписать сюда свою параллельную задачу
}

и через объект CAbsexParaApiIface можно запустить ее выполнение в параллельном процессе и передать параметры. В примере параллельная задача просто анимирует свой параметр - текстовую строчку, пересылаемую ей из главного процесса:

    rAbsexParaApi->InitAbsexParaApi();
    rAbsexParaApi->EnsureAbsexParaApiProcessStarted();
 
    rAbsexParaApi->AnimateDemoString("Animate me!");
 
    rAbsexParaApi->CloseAbsexParaApi();

Таких задач можно запустить сколько угодно как с использованием реального внешнего процесса так и в рамках основного:

Параллельный код

Вышенарисованная анимация осуществляется кодом

void CAbsexParaApiInproc::RunParallelTasks()
{
    //DEMO: animation
 
    this->x_sDemoAnimation =
        x_sDemoAnimation.Get().GetTruncateLeft(1) +
            x_sDemoAnimation.Get().GetLeft(1);
 
    _m_ptrmapUiAbsexParaApiAnimation.InvalidateState();
}

Вместо него следует разместить свой прикладной код для параллельного выполнения. Это должна быть одна минимальная по длительности итерация дабы как можно скорей вернуть управление системе. Эта функция будет вызываться постоянно позволяя отрабатывать очередные шаги параллельной задачи.

UI парасервиса

Практически полезные параллельные задачи чаще всего не будут с организацией UI. Оно может быть парасервису вообще не нужно. Для таких задач как фоновый опрос приборов, обработка сигналов, передача данных по сети нужно оставить только отладочное UI.

В примере в Absex параллельный процесс показывает свое UI по команде от главного:

    rAbsexParaApi->InitAbsexParaApi();
    ...;
 
    rAbsexParaApi->ShowAbsexParaServiceProcessUi(true);
    rAbsexParaApi->AnimateDemoString("Animate me!");

Простая логика управления этим UI полностью реализована в заготовке проекта и не является частью парасервисных библиотек. Под конкретное приложение ее нужно либо скрыть от пользователя (CCES) либо интегрировать в основное UI приложения.

Псевдо-параллельность

Класс CAbsexParaApiIface инкапсулирует всю коммуникацию приложения с экземпляром параллельной задачи. Absex демонстрировает две унаследованной реализации этого объекта:

  • CAbsexParaApiCallParallel- запускает внешний параллельный процесс и передает ему параметры задачи
  • CAbsexParaApiInproc - выполняет задачу в рамках процесса, в котором объект создан (in-proc)

При реализации своей задачи нужно следить за тем чтобы оба режима работы сохраняли работоспособность. Даже если выполнение задачи в основном процессе совершенно не практично для конечного пользователя, все равно это намного удобней для отладки чем возня с отладкой нескольких процессов одновременно.

Кроме того некоторые параллельные задачи может оказаться эффективней выполнить в рамках основного процесса когда они могут завершиться очень быстро и переключаться на параллельное выполнение только когда параметры требуют существенного времени ожидания результата.

В демо-коде параллельность новой задачи выбирается на основании чекбокса в тестовой панели управления главного процесса:

ref<CAbsexParaApiIface> CAbsexParaMapImpl::NewAbsexParaApiProcess()
{
    if (x_bCreateNewAbsexAsParallelProcess)
    {
        // Real parallel process enabled
        return ref<CAbsexParaApiCallParallel>();
    }
    else
    {
        // External processes disabled, do the work in the main process
        return ref<CAbsexParaApiInproc>();
    }
}
Передача параметров

В CAbsexParaApiIface инкапсулируются все функции настройки параллельной задачи. В зависимости от режима запуска задачи параметры

  • либо постятся внешнему процессу через интерфейсParaService-Post(CAbsexParaApiCallParallel),
  • либо просто передаются коду в текущем процессе (CAbsexParaApiInproc).

Например функция AnimateDemoString(sAnimateMe) для парасервисной реализации:

void CAbsexParaApiCallParallel::OnAnimateDemoString(
        str sAnimateMe)
{
    // change parameters for the parallel process
    x_rAbsexParaServiceControlPost->x_sDemoAnimate = sAnimateMe;
 
    // repost current task state to the parallel process
    RepostAbsexParaPost();
}
void CAbsexParaApiCallParallel::RepostAbsexParaPost()
{
    ...
    // post and start a process if not already running
    x_rAbsexParaServiceControlPost->
        RunParaServicePost(
            sPostId,
            sInstanceId);
}

а для in-proc просто:

void CAbsexParaApiInproc::OnAnimateDemoString(
        str sAnimateMe)
{
    this->x_sDemoAnimation = sAnimateMe + "    ";
}

Естественно когда RunParaServicePost() передаст этот параметр параллельному процессу тот в свою очередь передаст его своему объекту CAbsexParaApiInproc::OnAnimateDemoString(). То есть для параллельного режима работы мы имеем два экземпляра объекта AbsexParaApi: один CAbsexParaApiCallParallel в основном процессе передающий параметры другому объекту CAbsexParaApiInproc уже в параллельном процессе. В in-proc же режиме у нас всего один CAbsexParaApiInproc объект в основном процессе.

Main process: ParaApiCallParallel ---> ParaService process: ParaApiInproc
   Main process: ParaApiInproc
Добавим параметр

Чтобы лучше понять всю цепочку передачи данных параллельной задаче добавим еще один параметр парасервису. Предположим нам понадобилось выбирать одну из двух параллельных задач: анимирование текстовой строчки прокруткой или мигание ею. То есть type<>:

class CAbsexParaTaskTypeImpl
{
 
    // Selectable task
    str RunParaTask(
            str sDemoAnimation)
            new virtual auto;
 
};
 
    // Scrolling animation
    class CAbsexParaTaskTypeForAnimate : public CAbsexParaTaskTypeImpl
    {
        virtual str OnRunParaTask(
                str sDemoAnimation)
                return
                    // scroll string to the left one char at a time
                    sDemoAnimation.GetTruncateLeft(1) +
                        sDemoAnimation.GetLeft(1);
    };
 
    // Flashing
    class CAbsexParaTaskTypeForFlash : public CAbsexParaTaskTypeImpl
    {
        virtual str OnRunParaTask(
                str sDemoAnimation)
                return
                    // flash string color
                    FlipRtfColor(sDemoAnimation);
    };
Поле поста

Все данные парасервису передаются экспозицией объекта поста куда мы и добавляем новый параметр задачи:

class CAbsexParaServiceControlPost : public CInterprocPostIfaceGp
{
    str x_sDemoAnimate = ...
 
    //DEMO: A task to perform
    type<CAbsexParaTaskTypeIface> x_typeAbsexParaTaskType,
            auto(Get, Set);
 
    bool x_bShowAbsexParaServiceUi = ...
};
Функция в ParaApi

В абстрактный интерфейс параллельной задачи добавляем публичную функцию изменения ее параметра:

class CAbsexParaApiIface : public object
{
    void InitAbsexParaApi(...
 
    void CloseAbsexParaApi(...
 
    void AnimateDemoString(...
 
    //DEMO: Choose a task to perform
    void SelectParaTask(
            type<CAbsexParaTaskTypeIface> typeAbsexParaTaskType)
            new virtual auto;
 
    void ShowAbsexParaServiceProcessUi(...
}
Пересылка параметра

Реализация этой функции для ApiCallParallel просто изменяет один из параметров постоянно хранимого поста и перепосылает все(!) параметры параллельному процессу:

void CAbsexParaApiCallParallel::OnSelectParaTask(
        type<CAbsexParaTaskTypeIface> typeAbsexParaTaskType)
{
    rASSERT(IsAbsexParaApiOpened()); // base class takes care
 
    // change parameters for the parallel process
    x_rAbsexParaServiceControlPost->x_typeAbsexParaTaskType =
        typeAbsexParaTaskType;
 
    // repost current task state to the parallel process
    RepostAbsexParaPost();
}
Прием параметра

На приемном конце в параллельном процессе все посылки получает альтернативный ParaLoader используемый в параллельном процессе вместо стандартного AbsexLoader. Объект ApiInproc является его мембером и получает обновление всех инструкций:

void CAbsexParaLoaderImpl::HandleAbsexParaServiceControlPost(
        ref<CAbsexParaServiceControlPost> rAbsexParaServiceControlPost)
{
    // Pass parameters to the parallel task
    x_rAbsexParaApiInproc->
        AnimateDemoString(
            rAbsexParaServiceControlPost->x_sDemoAnimate);
 
    // Pass parameters to the parallel task
    x_rAbsexParaApiInproc->
        SelectParaTask(
            rAbsexParaServiceControlPost->x_typeAbsexParaTaskType);
 
    // Handle main UI state
    ShowAbsexParaServiceMainWindow(
        rAbsexParaServiceControlPost->x_bShowAbsexParaServiceUi);
}
Использование параметра

Теперь в параллельном процессе в объект CAbsexParaApiInproc поступает вызов функции SelectParaTask() и тоже самое происходит в основном процессе в in-proc режиме. Только в основном процессе в отличие от переходников и постов этот параметр просто передается через самую обычную виртуальную функцию:

void CAbsexParaApiInproc::OnSelectParaTask(
        type<CAbsexParaTaskTypeIface> typeAbsexParaTaskType)
{
    this->x_typeSelectedAbsexParaTaskType =
        typeAbsexParaTaskType;
}

где x_typeSelectedAbsexParaTaskType это новый атрибут хранящийся в икапсуляторе задачи:

class CAbsexParaApiInproc : public CAbsexParaApiIface
{
    str x_sDemoAnimation = ...
 
    //DEMO: A task to perform
    type<CAbsexParaTaskTypeIface> x_typeSelectedAbsexParaTaskType,
            auto(Get);
};

используем его при выполнении задачи:

void CAbsexParaApiInproc::RunParallelTasks()
{
    //DEMO: animation
 
    // execute task
    //this->x_sDemoAnimation =
    //    x_sDemoAnimation.Get().GetTruncateLeft(1) +
    //        x_sDemoAnimation.Get().GetLeft(1);
 
    // execute polymorphic task
    this->x_sDemoAnimation =
        x_typeSelectedAbsexParaTaskType->
            RunParaTask(
                x_sDemoAnimation);
 
    _m_ptrmapUiAbsexParaApiAnimation.InvalidateState();
}

Я специально привел пример параметра выбирающего логику а не просто посылающего данные. Не смотря на то, что в шаблоне модуля предусмотрен только один класс инкапсуляции парасервиса ему ничто не мешает выполнять совершенно разные задачи.

В реальном приложении вынесите всю прикладную логику (из RunParallelTasks(), RunParaTask()) за пределы пакета управления парасервисом. В пакете ParaService должен находится только код необходимый для организации параллельных процессов. Не надо мешать туда и приложение!

Отладка

В командной строке

   /ParaService:ParaAbsex_0

заставит модуль запустится как парасервис с sInstanceId = 0. Запускать его надо из отдельного IDE до запуска его основного процесса. Под отладчиком механизм автоматической терминации невостребованного парасервиса блокируется.

Ну и естественно основную часть прикладной логики гораздо проще отлаживать in-proc для чего предусмотреть скрытый переключатель режима запуска демонстрируемый в UI Absex.

Заготовка тестов CAbsexParaTests прогоняет ParaLoader и его ParaApiInproc на нескольких итерациях и дампит их внутреннее состояние используя предусмотренный для этого механизм эмуляции командной строки в тестах. В параллельном процессе ClassTests не выполняются, будучи бессмысленным дублированием при правильной инкапсуляции парасервисов в приложении.

Структура исходников

Все классы организации парасервисов в Absex собраны в пакете ParaService.

Правки в остальных файлах заготовки проекта не существенные, не влияющие на его логику:

  • CAbsexProject:: ref<CAbsexParaLoaderIface> x_rAbsexParaLoader;
  • CAbsexProject.cpp: if (!x_rAbsexParaLoader->IsStartedAsAbsexParaService()) { загрузка дополнительныхмодулей }
  • CAbsexProject.cpp: if (!x_rAbsexParaLoader->IsStartedAsAbsexParaService()) { x_rAbsexLoader->StartApplication(); }
  • CAbsexProject::OnShutdownProject(): x_rAbsexParaLoader->ShutdownAbsexParaServiceLoader();
  • CAbsexSessionImpl::x_rAbsexParaMap
  • OnInitAbsexSession: x_rAbsexParaMap->InitAbsexParaMap()
  • OnCloseAbsexSession: x_rAbsexParaMap->CloseAbsexParaMap()
  • //DEMO: UiAbsexSessionMain: NewUiAbsexParaMapControlPanel()
  • //DEMO: UiAbsexSessionMain: x_rAbsexParaMap->x_rAbsexParaApiDemoProcessSecond->AnimateDemoString()
ParaLoader

Когда модуль загружается в качестве парасервисного процесса не используется ни основной Loader ни Session. Их обоих заменяет CAbsexParaLoaderIface который реализует прием информации от основного процесса и запуск объекта, выполняющего задачу:

class CAbsexParaLoaderImpl : public CAbsexParaLoaderIface
{
 
    // Inproc implementation for the required application functions
    ref<CAbsexParaApiInproc> x_rAbsexParaApiInproc,
            auto(Get);
 
...
 
    // This Post handler must adhere to PFN_Handler signature in onpost template<>
    //   This is a ParaService post handler.
    void HandleAbsexParaServiceControlPost(
            ref<CAbsexParaServiceControlPost> rAbsexParaServiceControlPost);
    onpost<HandleAbsexParaServiceControlPost> _m_onpostAbsexParaService;
};

После успещной инициализации процесса как вторичного параллельного в CAbsexParaLoaderImpl::OnTryInitAsAbsexParaServiceProcess() запрещено обращаться к обычному лоадеру, сессии и конфигу. ParaLoader поддерживает создание своего собственного главного окна (UiAbsexParaServiceMainWindow), играет роль сессии организуя весь цикл жизни x_rAbsexParaApiInproc и получает всю необходимую конфигурацию задачи как параметры поста CAbsexParaServiceControlPost.

Существующей реализации ParaLoader достаточно для организации одного или множества параллельных процессов одного класса. Никаких модификаций для реального приложения в нем не требуется.

На стороне главного процесса ParaLoader не используется после того как IsStartedAsAbsexParaService() вернет false:

void CAbsexProject::OnInitStandalone()
{
...
 
//VL: 2012-04-06
    // Don't start ParaService UI until requested
    //   and don't start the Session ever!
    if (!x_rAbsexParaLoader->IsStartedAsAbsexParaService())
    {
//VL.
        // Create main UI and start running as an application
        x_rAbsexLoader->StartApplication();
    }
}
ParaMap

На стороне главного процесса все множество параллельных и псевдо-параллельных задач инкапсулируется и управляется через один объект CAbsexParaMapIface.

Если по природе приложения каждая сессия может иметь свой собственный набор параллельных процессов, то x_rAbsexParaMap является пропертем сессии. Этот решение заложено в шаблон. Но если параллельный процесс принципиальный singleton и сессии не имеют никакой индивидуальности в общении с ним то ParaMap надо сделать глобальной пропертью проекта. Шаблон это не демонстрирует.

В Absex ParaMap демонстрирует два альтернативных способа инкапсуляции объектов процессов - как проперти и как массив:

    ref<CAbsexParaApiIface> x_rAbsexParaApiDemoProcessFirst,
            auto(Get);
    ref<CAbsexParaApiIface> x_rAbsexParaApiDemoProcessSecond,
            auto(Get);
    ref<CAbsexParaApiIface> AddAbsexParaApiProcess();
    ptr<CAbsexParaApiIface> LookupAbsexParaApiProcess(...);
    bool IterateAbsexParaMap(...)

Оставьте только один из них!

Если процессов конечное количество то их удобней идентифицировать пропертями. Можно оставить массив для их итерации но убрать публичное Add().

Если процессов переменное количество то надо удалить проперти.

В OnInitAbsexParaMap() инициализация процессов делается чисто для забавного демо. Решите надо ли загружать ваши процессы сразу и какие начальные параметры передать.

ParaMap обеспечивает чисто демонстрационную панель управления процессами которая должна быть убрана в CCES если не будет полностью заменена специфичным для вашего приложения UI.

Удаление из проекта

Чтобы удалить из нового модуля всю описанную функциональность достаточно удалить пакет ParaService и все ссылки на словосочетание "AbsexPara" ("МойпроектPara") из остальных файлов. Проект останется полностью работоспособным.

Но в отличие от интегрированного PexContact удалять поддержку парасервисов не видится никогда полезным. Пока он не используется он никак не замутняет код основного приложения. А представить себе проект в котором абсолютно точно никогда параллельные задачи не понадобятся сложно.

Посылки в ответ

CAbsexParaServiceResponsePost добавлен для демонстрации отсылки ответных данных из параллельного процесса главному. Эта функциональность не обеспечивается ParaService и не имеет к нему никакого отношения. Это обычный Interproc-Post. CAbsexParaApiInproc делает демо-пост по клику на тестовой кнопке а принимает посылку ParaMap и показывает счетчик:

Так как эта функция никак к парасервису не привязана то данное демо не различает от какого именно процесса приходит счетчик. Вы можете добавить идентификацию источника еще одним полем CAbsexParaServiceResponsePost если это необходимо (см. GetAbsexParaApiInstanceId()).

Bonus: High Speed LL Channel

К данной заготовке прилагается один Разделяемый буфер для передачи потоков сигналов между процессами. Циклический, демонстрирующий эффективную высокочастотную онлайновую передачу потока простых по структуре данных в направлении от парасервисных процессов к главному.

Он абсолютно никак не относится к парасервисному фреймворку и является альтернативой использованию экспозируемых посылок CAbsexParaServiceResponsePost.

Я вставил его в Absex во избежание катастрофического абюза Interproc-Post который нельзя использовать для передачи онлайновых потоков. Для передачи сигналов, трендов, производных индексов и других данных посылки которых могут происходит чаше 1/раз в секунду или более 1000/раз в минуту нужно использовать циклический буфер. Скажем так: посты можно использовать например для передачи информации о комментариях проставляемых пользователем в онлайн но нельзя использовать для передачи событий от автоматической детекции особенностей сигналов.

CAbsexParaChannelImpl инкапсулирует CInterprocRingIfaceGp и оформляет ему интерфейс для посылки и приема блоков данных в формате приложения:

class CAbsexParaChannelImpl : public object
{
    // Alloc shared memory
    void InitAbsexParaChannel(
            int iChannel,
            out str& out_sError);
 
    // Cleanup
    void CloseAbsexParaChannel();
 
    // Send data into the ring buffer
    void StoreAbsexParaChannelBlock(
            point pointDemoDataBlock,
            out str& out_sError);
 
    // Read data from the ring buffer
    bool RetrieveAbsexParaChannelBlock(
            out point& out_pointDemoDataBlock,
            out int& out_nOverflowCount);
};

Для демо параллельные процессы передают (x_rAbsexParaChannelWriter) координаты мыши в окне UiAbsexParaApiInproc которые принимает m_rAbsexParaMap->x_rAbsexParaChannelReader.

ParaMap выводит одну из координат в виде символа а другая определяет цвет в бегущей строке панели управления демонстрируя приходящий поток данных и переполнение буфера если двигать мышью слишком быстро:

Для демонстрации переполнения буфера размер ему задан абсурдно маленький. Реальное приложение должно предусматривать буфер с гигантским запасом на максимально возможное торможение приемника ибо переполнение рушит поток данных непредсказуемым образом.

Возможности отличить от какого процесса пришли данные не предусмотрено. Канал идентифицируется по текстовому ID (C_sSharedNameForAbsexParaChannel) + номеру (0) и в примере только один. Ничто не мешает создать несколько каналов в том числе и от главного процесса к параллельным(-ому). Если формат их данных идентичен то можно использовать один инкапсулирующий класс и менять только ID. Для других форматов надо создавать отдельные инкапсуляторы по аналогии с примером.

Таким буфером перегрузить систему сложнее чем постами. И последствия переполнения локальные и не фатальные для всего вокруг. Но тем не менее нужно понимать опасность рейс кондишенов когда отдельные посылки данных зависят друг от друга или от их прихода в определенный момент времени зависит логика приложения. Я специально в примере пересылаю две координаты x и у в рамках одного блока. Их отсылка по отдельности в сочетании с совместной обработкой на приеме без специальных мер буферизации которые очень тяжело отладить может вызвать катастрофические последствия для приложения.

Checklist

Что нужно сделать с заготовкой своего проекта:

  • Пока параллельность не нужна:
    • закомментировать UIв окне Session: "// ParaService Panel"
    • закомментировать код инициализации демо-сервисов вслед за "//DEMO: Add a couple of services." в OnInitBijouParaMap(),
    • убедится что в командной строке IDE не осталось "/ParaService:...",
    • ничего более удалять не нужно чтобы сабж себя никак не проявлял.
  • При возникновении потребности в реализации параллельной задачи отладьте ее на таймерах и Idle. Только убедившись что не-параллельная реализация абсолютно не приемлема рассматривайте возможность запуска дополнительного процесса. Не питайте никаких иллюзийна счет простоты отладки и сопровождения параллельных программ.
  • При созревании твердой уверенности что оптимизация распараллеливанием процессов абсолютно вам необходима разместите прикладную логику параллельной задачи в отдельном пакете (или другом модуле). И вызывайте ее из RunAbsexParallelTasks (или из RunAbsexParaTaskчтобы поддержать несколько разных задач полиморфно).
  • Помните, что параллельный процесс должен прокачивать сообщения иначе будет терминирован ParaService-библиотеками или ОС. Разбивайте задачу на минимальные дискреты и быстро возвращайте управление из RunAbsexParallelTasks.
  • Если задачу совсем не реально разбить на короткие (до 1-3 с.) итерации и она на долго забирает процессор, то обращайтесь к VLза консультацией.
  • Убедитесь что параллельная задача не использует конфиг, лоадер или сессиюосновного процесса. И никаких других ссылок и указателей заведомо не доступных в другом процессе.
  • В #ParaMap нужно оставить только один механизм создания объектов процессов - проперти или массив-Add.
  • Добавьте свои параметрыпроцесса вместо демонстрационных как описано в примере выше.
  • Из приложения вызовите функции своего объекта ParaApi для изменения параметров процесса когда это нужно. Не пытайтесь передавать высокочастотные потоки таким образом!
  • Если необходимо задействуйте response-пост для возврата низкочастотных результатов в главный процесс.
  • При возникновении потребности в высокочастотных каналах между процессами воспользуйтесь примером и модифицируйте его под свою низкоуровневую структуру передаваемого блока с использованием только безопасных sbuf функций как в примере.
  • Никто не мешает использовать обычные файлыдля формулировки параметров параллельным задачам. Их можно копить в директории и использовать как очередь задач. Этот подход страдает очевидными недостатками если удаление мусора и блокировки файлов на одновременный доступ плохо продуманы.
  • Не грузите заранее не нужные в параллельном процессе модули чтобы не замедлять его старт и экономить ресурсы. См. проверку на  if (!x_rAbsexParaLoader->IsStartedAsAbsexParaService()) в OnInitProject().
  • Beware of race coditions! Продумайте все возможные рейс кондишены. Убедитесь, что ни при каких даже самых невероятных последовательностях коммуникации логика вашего приложения не может сбиться. Не рассчитывайте на тестирование. Оно в принципе не может выявить всех буг временных диаграмм параллельных процессов! Когда они выявятся у ваших пользователей вы хрен воспроизведете их у себя.