Skip to content

Overview

The lofar_udp_io_* function offer an interface to one of several I/O sources, from writing to disk as normal files or Zstandard compressed files, or to in-memory writes through named FIFO pipes and PSRDADA ringbuffers.

The standard way to access the interfaces are through the use of a _alloc() and then a _setup() function to configure a struct for I/O operations, followed by several calls to a read/write operation function, and then a _cleanup() function before exiting the program. The sections below give a brief overview of the expected configuration of the structs before calling the setup functions, and the expected behaviour while performing some I/O operations.

lofar_udp_io_read Interface

Setup

A pre-initialised struct can be allocated on the heap by calling the lofar_udp_io_read_config_alloc() function. It may return NULL if the allocation fails.

The struct should then be configured, with at least the following members set: - readerType - (inputLocations OR inputDadaKeys)[0:numInputs] - numInputs

There is a function for parsing the first and second variables directly from an input optarg flag or char array into the configuration struct, lofar_udp_io_read_parse_optarg(). This expects an input string following the conventions that are detailed in the the CLI readme.

Additionally, we recommend preparing: - The maximum number of elements expected to be read per iteration - A buffer than can hold this number of elements

Both of those variables can then be used alongside the lofar_udp_io_read_setup_helper function in order to simplify the setup process, accounting for any requirements of some processing methods (such as the page sizes for Zstandard compressed buffers).

If you want to directly use the lofar_udp_io_read_setup function, the size variable should be set into the readBufSize[0, numInputs], decompressionTracker[0,numInputs].size, while the buffer is used to initialise the decompressionTracker[port].src if the readerType is ZSTDCOMPRESSED.

Operations

Repeated calls to lofar_udp_io_read, alongside a target buffer and requested read length will produce as much data as possible from the source. By default, it will return the amount of data read, but may produce a negative or 0 value when no more data is available.

Write lengths must always be less than the maximum length set during the struct configuration.

For the ZSTANDARD reader, the input buffer must always match the buffer provided to the initialisation function, or error may occur (this does not apply to the ZSTANDARD_INDIRECT mode).

Cleanup

A single call to lofar_udp_io_read_cleanup() with your configuration struct will close any references to the input files and cleanup any allocated memory from the struct, before freeing the struct itself.

Example Usage

int32_t myFunc(void) {
    // `goto` use not recommended, used to keep example clean
    const char inputFormat[] = "myInputFile_[[port]].zst,0,2";
    const int32_t numInputs = 2;
    int32_t returnVal = 0;

    lofar_udp_io_read_config *reader = lofar_udp_io_read_alloc();
    reader->numInputs = numInputs;

    // Sets readerType, inputLocations, and associated base/offset/step values
    if (lofar_udp_io_read_parse_optarg(reader, inputFormat) < 0) {
        goto cleanup;
    }

    const int64_t numBytesPerRead = 2 << 20;
    int8_t *readerBuffers[numInputs] = { NULL };
    for (int32_t i = 0; i < numInputs; i++) {
        readerBuffers[i] = (int8_t*) calloc(numBytesPerRead, sizeof(int8_t));
        if (readerBuffers[i] == NULL) {
            goto cleanup;
        }

        if (lofar_udp_io_read_setup_helper(reader, readerBuffers, numBytesPerRead, i) < 0) {
            goto cleanup;
        }
    }


    // Operate on the data while requesting it
    int64_t lastBytesRead[numInputs] = { 0 };
    while (returnVal == 0) {
        for (int32_t i = 0; i < numInputs; i++) {
            if ((lastBytesRead[i] = lofar_udp_io_read(reader, i, readerBuffers[i], numBytesPerRead)) < 0) {
                returnVal = -1;
            }
        }
        // Do something with data
    }

    returnVal = 0;


    if (0) {
        cleanup:
        returnVal = 1;
    }
    lofar_udp_io_read_cleanup(reader);
    for (int32_t i = 0; i < numInputs; i++) {
        FREE_NOT_NULL(readerBuffers[i]);
    }
    return returnVal;
}

lofar_udp_io_write Interface

Setup

A pre-initialised struct can be allocated on the heap by calling the lofar_udp_io_write_config_alloc() function. It may return NULL if the allocation fails.

The struct should then be configured, with at least the following members set: - readerType - outputFormat - progressWithExisting - Any additional configuration required for the specific writer (zstdConfig, dadaConfig)

The first two of these can be parsed from a format string (see the CLI readme for formatting options) using the lofar_udp_io_write_parse_optarg() function.

Additionally, we recommend preparing: - The maximum number of elements expected to be written per iteration - The number of outputs - A starting iteration (for output [[iter]] substitution) - A starting reference number (for output [[pack]] substitution)

Which can then be passed into the lofar_udp_io_write_setup_helper() function to help with initialisation and sanity checking. If you want to initialise the maximum write size and still call this function, the outputLength array will be ignored if it's first value is LONG_MIN.

Operations

Repeated calls to lofar_udp_io_write, alongside an input buffer and requested write length will output as much data as possible from the source. By default, it will return the amount of data written, though it may return a 0 or negative value on an error.

Write lengths must always be less than the maximum length set during the struct configuration.

Cleanup

A single call to lofar_udp_io_write_cleanup() with your configuration struct will begin one of two given cleanup processes. The fullClean variable option allows for the struct to be reset for future iterations when it is set to 0, or will clear all open references, allocations and free itself when it is set to any non-zero value.

Example Usage

int32_t myFunc(void) {
    // `goto` use not recommended, used to keep example clean
    const char outputFormat[] = "myOutputFile_[[port]].zst,0,2";
    const int8_t numOutputs = 2;
    int64_t numBytesPerWrite[numOutputs];
    int32_t returnVal = 0;

    lofar_udp_io_write_config *writer = lofar_udp_io_write_alloc();
    writer->numOutputs = numOutputs;
    writer->progressWithExisting = 0; // Don't proceed if the output already exists
    writer->zstdConfig.compressionLevel = 3;
    writer->zstdConfig.numThreads = 2;

    // Sets readerType, inputLocations, and associated base/offset/step values
    if (lofar_udp_io_write_parse_optarg(writer, outputFormat) < 0) {
        goto cleanup;
    }

    for (int8_t i = 0; i < numOutputs; i++) {
        numBytesPerWrite[i] = 2 << 20;
    }

    if (lofar_udp_io_write_setup_helper(writer, numBytesPerWrite, numOutputs, 0, 0) < 0) {
        goto cleanup;
    }

    // Operate on the data while requesting it
    while (returnVal == 0) {
        // Request some data to write
        int8_t **workingData = give_me_data(numOutputs, numBytesPerWrite);
        for (int8_t i = 0; i < numOutputs; i++) {
            if ((lofar_udp_io_write(writer, i, workingData[i], numBytesPerWrite[i])) < 0) {
                returnVal = -1;
            }
        }
    }

    returnVal = 0;


    if (0) {
        cleanup:
        returnVal = 1;
    }
    lofar_udp_io_write_cleanup(writer, 1);
    return returnVal;
}