Разработки          Услуги          О компании          Контакты  

инкапсулятор общей памяти CInterprocRingIfaceGp - safe shared memory ring buffer (cycled)

Материал из биософт-м

Перейти к: навигация, поиск
  !   Данная информация предназначена только только для IT-специалистов по системной интеграции модулей БИОСОФТ-М. (см. Руководства пользователя к программным продуктам)

Для оперативной и относительно безопасной пересылки потоков бинарных данных между двумя или более процессами в Interproc реализован CInterprocRingIfaceGp.

Задача

Реализовать универсальную возможность передавать потоки байтов из одного процесса в другой. Оптимизированную для пересылки приборных сигналов между параллельно работающими процессами.

Особенности

  • Буфер циклический, размер фиксируется приложением при инициализации.
  • Выбираемый приложением размер буфера позволяет обеспечить необходимый объем кеширования для избежания потерь выборок.
  • Возможность нескольким процессам считывать данные из буфера одновременно.
  • Не требует блокировок процессов на время доступа к данным за счет специальной безопасной последовательности доступа к управляющим структурам в реализации CInterprocRingIfaceGp.
  • Специальная расширенная возможность точной синхронизации пересылаемых блоков по нескольким буферам идентичной структуры.

Ограничения

  • Только один процесс может писать в буфер.
  • Если читающий процесс не вычитал данные во время часть потока будет потеряна.
  • Посылающему процессу запрещено за один раз размещать байтов на более половины циклического буфера.

Предыстория

Базовый не универсальный код уже использовался в Paradop для пересылки спектра. Унификация потребовалась для Unimod.

Применение

Всем приложениям доступен объект CInterprocRingIfaceGp.


 i  Не забывайте менять ID буфера когда изменяется структура передаваемых данных в новой версии программы, чтобы она даже не пыталась работать со старой!

Идентификация

Каждый разделяемый буфер имеет уникальный глобальный в системе строковый идентификатор. Указывается при инициализации.

Жизненный цикл

Разделяемая память физически остается аллокированной до тех пор пока существует хотя бы один объект CInterprocRingIfaceGp инициализированный с ее идентификатором. Деструктор CInterprocRingIfaceGp автоматически освобождает память. В силу неопределенности деструктора следует вызывать CloseInterprocRing() для освобождения системных ресурсов.

Простая версия интерфейса (рекомендуется)

Все процессы открывают буфер. Только один из них пишет туда (StoreIntoInterprocRing()). Все остальные могут вычитывать (RetrieveFromInterprocRing()).

Принцип FIFO. Кто не успел вычитать тот опоздал и привет кусок сигнала.

    //
    // Simple single channel buffer (recommended)
    //
 
    // Alloc shared ring buffer
    void InitInterprocRing(
            str sGlobalSharedRingName,
            int nRingBytes,
            out str& out_sError)
            new virtual auto;
 
    // Free memory
    void CloseInterprocRing()
            new virtual auto;
 
    // Push bytes into the ring buffer.
    //   I you store more than the reader have anough time to read
    //   the ring system will overflow lossing and corrupting data.
    //   This function will not allow to post more than half the
    //   buffer size.
    void StoreIntoInterprocRing(
            sbuf sbufPortion,
            out str& out_sError)
            new virtual auto;
 
    // Reads all available data moving internal read
    //   pointer ahead.
    //   It returns the number of overflow events happened
    //   since the last read. Non-zero out_nOverflowCount means
    //   data loss (consider increasing nRingBytes then).
    // nRestrictResultBytes either == -1 to ignore or 1000 for Monitex.
    sbuf RetrieveFromInterprocRing(
            out int& out_nOverflowCount,
            int nRestrictResultBytes)
            new virtual auto;

Многоканальный синхронизированный поток

Это более сложный режим без которого могут обойтись большинство приложений.

CInterprocRingIfaceGp имеет альтернативный набор функция для передачи и получения потоков данных сразу по нескольким каналам.

Особенность такого режима в том, что для каждого канала гарантируется прием одновременно одинакового количества данных по каждому из каналов. При этом общее количество данных которое может быть принято или передано за одну итерацию может варьироваться.

В этом режиме один объект CInterprocRingIfaceGp инкапсулирует несколько циклических буферов но с общими указателями записи и чтения потоков. Размер буфера для всех потоков одинаковый и задается при инициализации.

На посылателя в многоканальном режиме накладывается требование передавать данные сразу по всем каналам в одинаковом количестве. (однако объем передаваемых данных за одну итерацию может быть любой, не приводящий к переполнению FIFO)

Данная оптимизация изначально была сделана для Paradop для того чтобы фрагменты звука приходили всегда одинаковыми порциями по всем каналам.

Интерфейс симметричный с одноканальным режимом, но более сложный использует еще один класс канала - CInterprocRollIfaceGp при приеме данных:

class CInterprocRingIfaceGp : public object
{
public:
 
...
 
    //
    // Multichannel buffer (synchronized buffer set)
    //
 
    // Alloc advanced multichannel buffer set
    void InitMultichannelInterprocRing(
            str sGlobalSharedRingName,
            int nRingBytes,
            int nChannels,
            out str& out_sError)
            new virtual auto;
 
    // Multichannel version writes equal portions to a number of channels
    //    as specified. It will write 1...nChannels portions splitting
    //    the data equally from sbufPortionsMerged.
    //    Some channels can be left unwritten with undefined data.
    //    (all channels share the same write and read pointers)
    void StoreIntoMultichannelInterprocRing(
            sbuf sbufPortionsMerged,
            int nSplitForChannels,
            out str& out_sError)
            new virtual auto;
 
    // Reads pointers to all currently available data moving internal read
    //   pointer ahead.
    //   It returns the number of overflow events happened
    //   since the last read. Non-zero out_nOverflowCount means
    //   data loss (consider increasing nRingBytes then).
    // When you get non-zero out_nReadBytes iterate all InterprocRolls
    //   and read the data portions from every channel.
    //   Whole procedure from getting the position to the last read
    //   theoretially should be interlocked. But in practice with a large
    //   enough buffers it will not matter.
    // nRestrictResultBytes either == -1 to ignore or 1000 for Monitex.
    void RetrieveReadPositionFromMultichannelInterprocRing(
            out int& out_iStartReadRollsAt,
            out int& out_nReadBytes,
            out int& out_nDeltaToReadFromStart,
            out int& out_nOverflowCount,
            int nRestrictResultBytes)
            new virtual auto;
 
    // Iterate allocated rolls for multichannel ring.
    bool IterateRollsInMultichannelInterprocRing(
            out iter& out_i,
            out ref<CInterprocRollIfaceGp>& out_rInterprocRoll)
            new virtual auto;

Для чтения из канала используется ReadInterprocRollBlock() на основании данных, полученных из RetrieveReadPositionFromMultichannelInterprocRing():

class CInterprocRollIfaceGp : public object
{
 
    // Allocate shared memory
    void InitInterprocRoll(
            str sBufferName,
            int nBytes,
            out str& out_sError)
            new virtual auto;
 
    //
    // Buffer circulatity is handled in CInterprocRingIfaceGp
    //   because multiple buffers have
    //   the same current positions.
    // Here we allow safe block read/write while
    //   additionally verifying read/write sequencing
    //
 
    // Write a block with optional internal consistency check
    void WriteInterprocRollBlock(
            int iStartAtByte,
            sbuf sbufWrite,
            EPositionCheck ePositionCheck,
            out str& out_sError)
            new virtual auto;
 
    // Read a block with optional internal consistency check
    sbuf ReadInterprocRollBlock(
            int iStartAtByte,
            int nReadBytes,
            EPositionCheck ePositionCheck,
            out str& out_sError)
            new virtual auto;

Для отправки данных в многоканальном режиме можно использовать только StoreIntoMultichannelInterprocRing().

Зачем?

Может возникнуть вопрос - зачем вообще усложнять объект CInterprocRingIfaceGp таким тяжелым интерфейсом а не использовать просто массив из отдельных буферов для многоканального режима? Ну может быть у них будут дублироваться указатели позиций записи и чтения в ситуации когда данные поступают по каналам строго синхронно, ну и что?

Естественно если данные в разных каналах полностью логически независимы и данные в них поступают в разные моменты времени разными порциями то действительно можно сделать массив из CInterprocRingIfaceGp и с каждым каналом работать как с независимым объектом. Так и нужно поступать!

Однако, если (как в Paradop) приложение ожидает, что посылая одинаковые блоки по всем каналам одновременно, оно получает их точно так же одновременно теми же кусками работать не будет. Будет существовать небольшая вероятность рассинхронизации.

Если вы используете несколько буферов одновременно для логически связанных данных ищите сразу проблемы рейс кондишенов которые очень трудно воспроизвести и отладить. (См. теорию в Разделяемая процессами память)

Использование многоканального вычитывания позволяет избежать рейс-артефакта нерегулярности размеров блоков. Если каждый канал данных передается независимо через асинхронный интерфейс без дополнительной блокировки, то размеры вычитываемых блоков будут иногда неожиданно не совпадать по казалось бы синхронным каналам. Даже если передающая сторона синхронно кладет буфера одинакового размера последовательно по каждому каналу. Дополнительная неприятность таких рассинхронизаций то, что их тяжело отлаживать. Вероятность что посылатель добавит данные точно в тот же момент времени что вычитыватель будет принимать данные по другому каналу может быть микроскопической. И тем неожиданней.

Естественно могут существовать и альтернативные многоканальные решения, потенциально даже более эффективные. Например передача блоков всех каналов по одному каналу с разделением их по тому или иному признаку при приеме. Такой подход будет более эффективен с точки зрения локализации доступа к памяти и соответствующего аппратного кеширования. Использование независимых каналов с блокировкой тридов тоже решение, но тоже страдающее своими побочными эффектами.

CInterprocRollIfaceGp

Может потенциально использоваться независимо как разделяемая память произвольного (непоследовательного) доступа. Естественно встает проблема блокировки данных, которая может лучше решаться применением специализированного тепмлейта о котором в свое время будет отдельная статья.

Примеры

Uport ParaService использует массив из одноканальных независимых буферов для передачи данных от каждого из портов. Применение сабжа тут на столько простое что и пояснять нечего.

www.biosoft-m.ru



Просмотры
Личные инструменты