Stream Data

Use the Python client to stream data from a Synnax cluster.

Streaming data is useful for real-time processing, visualization, and monitoring. This page guides you through using the Python client to stream data from a Synnax cluster. If you’d like a conceptual overview of how streaming works in Synnax, check out the streams page.

Opening a Streamer

To start streaming data, call the open_streamer method on the client and provide a list of channels to stream:

streamer = client.open_streamer(["channel1", "channel2"])

Reading Frames

To read the next incoming frame, call the read method on the streamer:

frame = streamer.read()

This call will block until a new frame is available.

Specifying a Timeout

It’s also possible to add a timeout parameter to the read method. If the timeout is reached before a new frame is available, the method will return None instead of a frame:

frame = streamer.read(timeout=5) # Wait for 5 seconds
if frame is None:
    print("Timed out waiting for a frame")

You can also provide a TimeSpan object as the timeout parameter:

import synnax as sy

frame = streamer.read(timeout=5 * sy.TimeSpan.SECOND)
if frame is None:
    print("Timed out waiting for a frame")

Downsampling

If you are interested in streaming data at a lower rate, you can use the optional downsample_factor parameter. This parameter will cause the streamer to skip the specified number of samples in each series before returning the next frame. Note that this parameter does not average values, and will skip samples instead.

For example, if you set the downsample factor to 2, every 2nd sample will be retained.

stream = client.open_streamer(
# a streamer.read() will return [0.13, 0.14]
stream = client.open_streamer(
    channels=["temperature", "pressure"],
    downsample_factor=2
)

Handling Partial Frames

When reading frames from a streamer, it’s important to note that a frame may not contain data for every channel specified when opening the streamer. For example, if we’re reading from two sensors, temperature and pressure, that are being sampled by different devices at different rates, we may receive a frame containing data only for the first channel, followed by a frame containing only data for the second channel.

stream = client.open_streamer(["temperature", "pressure"])
frame = streamer.read()
print(frame[-1]) # Print only the last sample(s) in the frame
# Output: {"temperature": 25.0}
frame = streamer.read()
print(frame[-1])
# Output: {"pressure": 1013.25}
frame = streamer.read()
print(frame[-1])
# Output: {"temperature": 25.1, "pressure": 1013.25}

Using a For Loop

The streamer object is an iterator, so you can use it in a for loop to iterate over incoming frames, blocking on each iteration until a new frame is received:

for frame in streamer:
    print(frame)

Updating the Channel List

If you want to update the list of channels being streamed, you can call the update_channels method on the streamer:

streamer.update_channels(["channel3", "channel4"])

This method will replace the current list of channels with the new list, not add to it.

Closing the Streamer

After you’re done streaming, it’s essential that you call the close method on the streamer to release the network connection and other related resources:

streamer.close()

Using a Context Manager

We recommend using the streamer as a context manager where possible, as this makes it easy to ensure that the streamer is closed correctly:

with client.open_streamer(["channel1", "channel2"]) as streamer:
    for frame in streamer:
        print(frame)

Using an Async Streamer

If you’re interested in using asyncio to stream data, you can use the open_async_streamer method on the client. This streamer implements an identical interface to a synchronous streamer, but all methods are asynchronous:

async with await client.open_async_streamer(["channel1", "channel2"]) as streamer:
    async for frame in streamer:
        print(frame)