reym
Streams

SDK

Freym Streams SDK documentation

Installation

To install the SDK, run the following command:

npm i --save @fraym/streams
go get -u github.com/freym/freym-api/go

Configuration

Using ENV Variables

You can configure the SDK by setting the following environment variables:

Variable NameDefaultDescription
STREAMS_CLIENT_ADDRESSAddress of the streams service
STREAMS_CLIENT_GROUP_ID""The group ID that identifies the application

Then use the configuration like this:

import { getEnvConfig } from "@fraym/streams";

...

const config = getEnvConfig();
import (
    "github.com/Becklyn/go-wire-core/env"
    "github.com/fraym/freym-api/go/streams/config"
)

...

_ = env.New() // this reads your environment variables from a `.env` or `.env.local` file
conf := config.NewEnvConfig()

Using Code Configuration

import { useConfigDefaults } from "@fraym/streams";

...

const config = useConfigDefaults({
    serverAddress: "my.streams.service",
    groupId: "my-app",
    deploymentId: 0, // optional, used for filtering events by deployment
    // 0 = no deployment filter
    // > 0 = filter by deployment: you will get events of deploymentIds <= this value>
});
import "github.com/fraym/freym-api/go/streams/config"

...

conf := config.NewDefaultConfig()
conf.Address = "my.streams.service"
conf.GroupId = "my-app"

Usage

Initialize the Client

import { newClient } from "@fraym/streams";

...

const client = await newClient({
    serverAddress: "my.sync.service",
    groupId: "my-app",
});

...

// use your client here

... 

// this will gracefully end the connection to the streams service 
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
await client.close();
import (
	"github.com/fraym/freym-api/go/streams"
	"github.com/fraym/golog"
)

logger := golog.NewZerologLogger()
client, err := streams.NewClient(logger, conf)
if err != nil {
    logger.Fatal().WithError(err).Write()
}

// this will gracefully end the connection to the streams service 
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
defer client.Close()

// use your client here

Publishing Events

The Streams service distinguishes between two types of events: Persistent Events and Messages.

  • Persistent events are stored in the event store and can be retrieved later.
  • Messages are not stored in the event store and are only used for communication between services. We also refer to messages as broadcast events.

Use messages for real-time communication between services where the event history is not important. Use persistent events to store business-critical data where you want to keep track of the event history.

The Streams service allows you to publish multiple events in a transactional manner. If an error occurs while publishing one of the events, none of the events will be published.

While an event must be associated with a topic, it can also be associated with a stream. This enables event streaming functionality for the event.

await client.publish("topic", [
    {
        tenantId: "tenant-id",
        type: "event-type",
        payload: {
            key: "value",
            otherKey: 123,
        },
    },
]);
import "github.com/fraym/freym-api/go/streams/domain/dto"

// ...

if err := client.Publish(
    context.TODO(),
    "topic",
    []*dto.PublishEvent{
        {
            TenantId: "tenant-id",
            Type: "event-type",
            Payload: dto.PublishEventPayloadMap{
                "key": {Value: "value"},
                "otherKey": {Value: 123},
            },
        }
    }
); err != nil {
    panic(err)
}
mutation Publish {
    publish(input: {
        topic: "topic",
        events: [{
            tenantId: "tenant-id",
            type: "event-type",
            payload:  [{
                key: "key",
                value: "\"value\"" 
            }, {
                key: "otherKey",
                value: "123"
            }]
        }]
    })
}

Subscribing to Events

The Streams service allows subscriptions to both persistent events and messages. It is possible to create a subscription that listens to multiple topics at once.

If the subscription handler fails to process a persistent event, the event is retried until it is successfully processed. Messages are not retried.

const ignoreUnhandledEvents = false; // optional parameter, if true, the subscription will treat events that have no handler as handled
const parallelTopicProcessing = false; // optional parameter, if true, the subscription will process events of different streams in parallel
const subscription = client.subscribe(["test"], ignoreUnhandledEvents, parallelTopicProcessing);
subscription.useHandlerForAllTypes(async e => {
    // handle the event here, throw in case of error
});

subscription.useHandler("event-type", async e => {
    // handle the event here, throw in case of error
});

subscription.start();

// stop the connection when you are done
subscription.stop();
import "github.com/fraym/freym-api/go/streams/domain/dto"

// ...
deploymentId := 0; // if > 0 you will only get events of deploymentIds <= this value
ignoreUnhandledEvents := false; // if true, the subscription will treat events that have no handler as handled
parallelTopicProcessing := false; // if true, the subscription will process events of different streams in parallel
subscription := client.NewSubscription([]string{"topic"}, ignoreUnhandledEvents, deploymentId, parallelTopicProcessing)
subscription.UseHandlerForAllEventTypes(
    func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
        // handle the event here
        return true, nil
    }
)

subscription.Use("event-type",
    func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
        // handle the event here
        return true, nil
    }
)

subscription.Start()

// stop the connection when you are done
subscription.Stop()
subscription Topics {
  topics(input: {
    topics: ["topic"]
    metadata: {
      group: "my-app"
      deploymentId: 0 # if > 0 you will only get events of deploymentIds <= this value
      parallelTopicProcessing: false # if true, the subscription will process events of different streams in parallel
    }
  }) {
    ... on TopicSubscriptionDataEvent {
      event {
        id
      }
    }
    ... on TopicSubscriptionDataSubscribed {
      error
      subscriberId
    }
    ... on TopicSubscriptionDataPanic {
      reason
    }
  }
}

mutation EventHandled($input: EventHandledInput!) {
  eventHandled(input: {
    tenantId: "tenant-id"
    topic: "topic"
    stream: "stream"
    id: "event-id"
    error: "error-message" ## optional leave empty on no error
    retry: true
  })
}

You can only register one handler per event type for a subscription. And you can only register one handler for all event types.

In case you register a "all event types" handler, it will be called for all event types that do not have a specific handler registered.

Parallel Topic Processing

You can use a parallelTopicProcessing subscription and a non parallel topic processing subscription at the same time. Events that were handled by a parallel topic processing subscription will be handled by the non parallel topic processing subscription as well.

Subscription Error Handling

In case of errors during event handling, the event can be resent to another subscriber. This is done by sending a Handled message with an error to the Streams service. The Streams service will then try to deliver the event to another subscriber.

The subscriber gets the number of retries that happened for an event within the metadata of the event.

If you want to retry the event, you can return false from the handler.

import { StreamHandlerError } from "@fraym/streams";

subscription.useHandlerForAllTypes(async e => {
    try {
        // handle the event here
    } catch (err) {
        throw new StreamHandlerError(err, e.retryNumber > 3);
    }
});
subscription.UseHandlerForAllEventTypes(
    func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
        var err error

        // handle the event here:
        // err = handleEvent(event)

        if err != nil {
            return event.RetryNumber > 3,err
        }
        return false,nil
    }
)
## use the retry field to indicate that the event should be retried
mutation EventHandled {
  eventHandled(input: {
    tenantId: "tenant-id"
    topic: "topic"
    id: "event-id"
    error: "error-message" ## optional leave empty on no error
    retry: true
  })
}

Querying Events

The Streams service provides several APIs to query events:

Get an Event by Its ID

const event = await client.getEvent( "tenant-id", "topic", "event-id");
event, err := client.GetEvent(context.TODO(), "tenant-id", "topic", "event-id")
if err != nil {
    panic(err)
}
query GetEvent {
  getEvent(input: {
    tenantId: "tenant-id"
    topic: "topic"
    id: "event-id"
  }) {
    id
  }
}

Get The Latest Event Within a Topic

const lastEvent = await client.getLastEvent("tenant-id", "topic");
lastEvent, err := client.GetLastEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
    panic(err)
}
query GetLastEvent {
  getLastEvent(input: {
    tenantId: "tenant-id"
    topic: "topic"
  }) {
    id
  }
}

Get the latest handled event within a topic

const lastHandledEvent = await client.getLastHandledEvent("tenant-id", "topic");
lastHandledEvent, err := client.GetLastHandledEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
    panic(err)
}
query GetLastHandledEvent {
  getLastHandledEvent(input: {
    tenantId: "tenant-id"
    topic: "topic"
    groupId: "my-app"
  }) {
    id
  }
}

Get the Last Event Within a Topic That Belongs to a Given List of Event Types

const lastEvent = await client.getLastEventByTypes("tenant-id", "topic", ["event-type"]);
lastEvent, err := client.GetLastEventByTypes(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"}
)
if err != nil {
    panic(err)
}
query GetLastEventByTypes {
  getLastEventByTypes(input: {
    tenantId: "tenant-id"
    topic: "topic"
    types: ["event-type"]
  }) {
    id
  }
}

Paginate Through All Events in a Topic That Belong to a Given List of Event Types

const pageSize = 100; // number of events to fetch per request
await client.iterateAllEvents("tenant-id", "topic", ["event-type"], pageSize, event => {
    // process the event here
});
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory

isEmpty, err := client.IterateAllEvents(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"},
    pageSize,
    bufferSize,
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}
## Using the GraphQL api you have to iterate pagination manually
query GetEvents {
  getEvents(input: {
    tenantId: "tenant-id",
    page: 0,
    perPage: 100,
    topic: "topic",
    types: ["event-type"]
  }) {
    id
  }
}

Paginate Through All Events in a Topic That Belong to a Given List of Event Types That Come After a Specific Event

const pageSize = 100; // number of events to fetch per request
await client.iterateAllEventsAfterEvent(
    "tenant-id", 
    "topic", 
    ["event-type"], 
    "event-id", 
    pageSize, 
    event => {
        // process the event here
    }
);
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory

isEmpty, err := client.IterateAllEventsAfterEvent(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"},
    "event-id",
    pageSize,
    bufferSize,
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}
## Using the GraphQL api you have to iterate pagination manually
query GetEventsAfterEventId {
  getEvents(input: {
    id: "event-id",
    tenantId: "tenant-id",
    page: 0,
    perPage: 100,
    topic: "topic",
    types: ["event-type"]
  }) {
    id
  }
}

Querying Streams

The Streams service provides several APIs to query streams:

Check if a stream is empty

const pageSize = 100; // number of events to fetch per request
const streamIterator = await client.getStreamIterator("topic", "tenant-id", "stream", pageSize);

const isEmpty = await streamIterator.isEmpty();
isEmpty, err := client.IsStreamEmpty(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
)
if err != nil {
    panic(err)
}
query IsStreamEmpty {
  isStreamEmpty(input: {
    tenantId: "tenant-id",
    topic: "topic",
    stream: "stream",
  })
}

Paginate Through All Events of a Stream

const streamIterator = await client.getStreamIterator(
    "topic",
    "tenant-id", 
    "stream", 
    100,// number of events to fetch per request
    false, // optional parameter, if true, the stream iterator will not use snapshots
);

await streamIterator.forEach(event => {
    // process the event here
});
isEmpty, err := client.IterateStream(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    0, // id of the deployment, 0 ignores the deployment filter
    100, // number of events to fetch per request
    1000, // number of events to buffer in memory
    false, // optional parameter, if true, the stream iterator will not use snapshots
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}
## Using the GraphQL api you have to iterate pagination manually
query GetStream {
  getStream(input: {
    tenantId: "tenant-id",
    topic: "topic",
    stream: "stream",
    page: 0,
    perPage: 100,
    deploymentId: 0, # optional: if you add this parameter, you will only get events of deploymentIds <= this value
    snapshotEventId: "snapshot-event-id", # optional: if you add this parameter, you will only get events after the snapshot event and the snapshot event itself
    doNotUseSnapshots: false # optional: if true, the query will not use snapshots
  }) {
    events {
      id
    }
    streamEventId
  }
}

Paginate Through All Events of a Stream That Come After a Specific Event

const streamIterator = await client.getStreamIterator(
    "topic", 
    "tenant-id", 
    "stream",
    100, // number of events to fetch per request
    false, // optional parameter, if true, the stream iterator will not use snapshots
);

await streamIterator.forEachAfterEvent("event-id", event => {
// process the event here
});
isEmpty, err := client.IterateStreamAfterEvent(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    "event-id",
    0, // id of the deployment, 0 ignores the deployment filter
    100, // number of events to fetch per request
    1000, // number of events to buffer in memory
    false, // optional parameter, if true, the stream iterator will not use snapshots
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}
## Using the GraphQL api you have to iterate pagination manually
query GetStreamAfterEventId {
  getStreamAfterEventId(input: {
    eventId: "event-id",
    tenantId: "tenant-id",
    topic: "topic",
    stream: "stream",
    page: 0,
    perPage: 100,
    deploymentId: 0, # optional: if you add this parameter, you will only get events of deploymentIds <= this value
    snapshotEventId: "snapshot-event-id",# optional: if you add this parameter, you will only get events after the snapshot event and the snapshot event itself
    doNotUseSnapshots: false # optional: if true, the query will not use snapshots
  }) {
    events {
      id
    }
    streamEventId
  }
}

Transaction Handling

You can wait for all events of a transaction (= correlation id) to be handled by a given set of consumer groups.

await client.waitForTransactionalConsistency(
    "tenant-id",
    "topic",
    "correlation-id",
    ["consumer-group"]
);
if err := client.WaitForTransactionalConsistency(
    context.TODO(),
    "tenant-id",
    "topic",
    "correlation-id",
    []string{"consumer-group"},
); err != nil {
    panic(err)
}
query WaitForTransactionalConsistency {
  waitForTransactionalConsistency(input: {
    tenantId: "tenant-id",
    topic: "topic",
    correlationId: "correlation-id",
    consumerGroups: ["consumer-group"],
  })
}

Snapshotting

Snapshotting can be used to optimize performance when reading event streams. When a snapshot event is added to the event stream, all APIs that paginate the stream will not return any events that occurred before the snapshot. Instead, they return the snapshot event and any events that come after the snapshot.

await client.createStreamSnapshot(
    "tenant-id", 
    "topic", 
    "stream", 
    "last-snapshotted-event-id", 
    {
        tenantId: "tenant-id",
        type: "snapshot-event-type",
        payload: {
            field: "value",
        },
    }
);
if err := client.CreateStreamSnapshot(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    "last-snapshotted-event-id",
    &dto.PublishEvent{
        TenantId: "tenant-id",
        Type: "snapshot-event-type",
        Payload: dto.PublishEventPayloadMap{
            "field": {
                Value: "value",
            },
        },
    }
); err != nil {
    panic(err)
}
mutation CreateStreamSnapshot {
  createStreamSnapshot(input: {
    tenantId: "tenant-id",
    topic: "topic",
    stream: "stream",
    lastSnapshottedEventId: "last-snapshotted-event-id",
    snapshotEvent: {
      tenantId: "tenant-id",
      type: "snapshot-event-type",
      payload: [{
        key: "field",
        value: "\"value\"",
      }]
    }
  })
}

GDPR

When publishing an event, a payload field can be associated with GDPR logic by specifying a default value for the field. The default will be applied if the field is invalidated.

await client.publish("topic", [
    {
        tenantId: "tenant-id",
        type: "event-type",
        payload: {
            field: {
                value: "value",
                gdprDefault: "default-value",
            },
        },
    },
]);
import "github.com/fraym/streams-go/v10/domain/dto"

// ...

if err := client.Publish(
    context.TODO(),
    "topic",
    []*dto.PublishEvent{
        {
            TenantId: "tenant-id",
            Type: "event-type",
            Payload: dto.PublishEventPayloadMap{
                "field": {
                    Value: "value",
                    Gdpr: &dto.PublishGdprEventPayload{
                        Default: "default-value",
                    }
                },
            },
        }
    }
); err != nil {
    panic(err)
}
mutation Publish {
    publish(input: {
        topic: "topic",
        events: [{
            tenantId: "tenant-id",
            type: "event-type",
            payload:  [{
                key: "field",
                value: "\"value\"",
                gdpr: {
                    default: "\"default-value\"",
                }
            }]
        }]
    })
}
await client.introduceGdprOnEventField(
    "tenant-id", 
    "default-value", 
    "topic", 
    "event-id", 
    "field"
);
if err := client.IntroduceGdprOnEventField(
    context.TODO(),
    "tenant-id",
    "topic",
    "event-id",
    "field",
    "default-value"
); err != nil {
    panic(err)
}
mutation IntroduceGdprOnEventField {
  introduceGdprOnEventField(input: {
    tenantId: "tenant-id",
    topic: "topic",
    eventId: "event-id",
    fieldName: "field",
    defaultValue: "\"default-value\"",
  })
}

When GDPR-relevant data is published, an event of type gdpr-data-recorded is published to the gdpr topic. This event contains all the information needed to build the logic to invalidate the data if necessary. While the CRUD service takes care of automatically invalidating GDPR relevant CRUD data, you need to implement GDPR logic using the events in the gdpr topic for any events that your business logic uses.

The invalidation API ensures that the GDPR relevant data is deleted and replaced with the default value. After the invalidation, there is no way to recover the original data as any trace of it is removed.

await client.invalidateGdprData("tenant-id","topic", "gdpr-id");
if err := client.InvalidateGdprData(
    context.TODO(),
    "tenant-id",
    "topic",
    "gdpr-id"
); err != nil {
    panic(err)
}
mutation InvalidateGdpr {
  invalidateGdpr(input: {
    tenantId: "tenant-id",
    topic: "topic",
    gdprId: "gdpr-id",
  })
}

Renaming of Event Types

If you need to rename an event type, you can use the renameEventType API. This is an operation where no data is getting lost as the the event type is simply renamed.

await client.renameEventType("topic", "old-event-type", "new-event-type");
if err := client.RenameEventType(
    context.TODO(),
    "topic",
    "old-event-type",
    "new-event-type",
); err != nil {
    panic(err)
}
mutation RenameEventType {
  renameEventType(input: {
    topic: "topic",
    oldType: "old-event-type",
    newType: "new-event-type",
  })
}

Erroneous Events

List Erroneous Events

Returns a list of erroneous events sorted by the event date.

const erroneousEvents = await client.listErroneousEvents(
    "tenant-id",
    "topic",
    ["event-type"],
    1000, // limit
);
if erroneousEvents, err := client.ListErroneousEvents(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"},
    1000, // limit
); err != nil {
    panic(err)
}
query GetErroneousEvents {
  getErroneousEvents(input: {
    tenantId: "tenant-id",
    topic: "topic",
    eventTypes: ["event-type"], # leave empty to get all event types
    limit: 1000, # optional
  }) {
    consumerGroup
    error
    event {
      id
    }
  }
}

Resend Erroneous Events

await client.resendErroneousEvent("tenant-id", "topic", "consumer-group", "event-id");
if err := client.ResendErroneousEvent(
    context.TODO(),
    "tenant-id",
    "topic",
    "consumer-group",
    "event-id",
); err != nil {
    panic(err)
}
mutation ResendErroneousEvent {
  resendErroneousEvent(input: {
    tenantId: "tenant-id",
    topic: "topic",
    consumerGroup: "consumer-group",
    eventId: "event-id",
  })
}