SDK
Freym Streams SDK documentation
Installation
To install the SDK, run the following command:
npm i --save @fraym/streamsgo get -u github.com/freym/freym-api/goConfiguration
Using ENV Variables
You can configure the SDK by setting the following environment variables:
| Variable Name | Default | Description |
|---|---|---|
| STREAMS_CLIENT_ADDRESS | Address of the streams service | |
| STREAMS_CLIENT_GROUP_ID | "" | The group ID that identifies the application |
Then use the configuration like this:
import { getEnvConfig } from "@fraym/streams";
...
const config = getEnvConfig();import (
"github.com/Becklyn/go-wire-core/env"
"github.com/fraym/freym-api/go/streams/config"
)
...
_ = env.New() // this reads your environment variables from a `.env` or `.env.local` file
conf := config.NewEnvConfig()Using Code Configuration
import { useConfigDefaults } from "@fraym/streams";
...
const config = useConfigDefaults({
serverAddress: "my.streams.service",
groupId: "my-app",
deploymentId: 0, // optional, used for filtering events by deployment
// 0 = no deployment filter
// > 0 = filter by deployment: you will get events of deploymentIds <= this value>
});import "github.com/fraym/freym-api/go/streams/config"
...
conf := config.NewDefaultConfig()
conf.Address = "my.streams.service"
conf.GroupId = "my-app"Usage
Initialize the Client
import { newClient } from "@fraym/streams";
...
const client = await newClient({
serverAddress: "my.sync.service",
groupId: "my-app",
});
...
// use your client here
...
// this will gracefully end the connection to the streams service
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
await client.close();import (
"github.com/fraym/freym-api/go/streams"
"github.com/fraym/golog"
)
logger := golog.NewZerologLogger()
client, err := streams.NewClient(logger, conf)
if err != nil {
logger.Fatal().WithError(err).Write()
}
// this will gracefully end the connection to the streams service
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
defer client.Close()
// use your client herePublishing Events
The Streams service distinguishes between two types of events: Persistent Events and Messages.
- Persistent events are stored in the event store and can be retrieved later.
- Messages are not stored in the event store and are only used for communication between services. We also refer to messages as broadcast events.
Use messages for real-time communication between services where the event history is not important. Use persistent events to store business-critical data where you want to keep track of the event history.
The Streams service allows you to publish multiple events in a transactional manner. If an error occurs while publishing one of the events, none of the events will be published.
While an event must be associated with a topic, it can also be associated with a stream. This enables event streaming functionality for the event.
await client.publish("topic", [
{
tenantId: "tenant-id",
type: "event-type",
payload: {
key: "value",
otherKey: 123,
},
},
]);import "github.com/fraym/freym-api/go/streams/domain/dto"
// ...
if err := client.Publish(
context.TODO(),
"topic",
[]*dto.PublishEvent{
{
TenantId: "tenant-id",
Type: "event-type",
Payload: dto.PublishEventPayloadMap{
"key": {Value: "value"},
"otherKey": {Value: 123},
},
}
}
); err != nil {
panic(err)
}mutation Publish {
publish(input: {
topic: "topic",
events: [{
tenantId: "tenant-id",
type: "event-type",
payload: [{
key: "key",
value: "\"value\""
}, {
key: "otherKey",
value: "123"
}]
}]
})
}Subscribing to Events
The Streams service allows subscriptions to both persistent events and messages. It is possible to create a subscription that listens to multiple topics at once.
If the subscription handler fails to process a persistent event, the event is retried until it is successfully processed. Messages are not retried.
const ignoreUnhandledEvents = false; // optional parameter, if true, the subscription will treat events that have no handler as handled
const parallelTopicProcessing = false; // optional parameter, if true, the subscription will process events of different streams in parallel
const subscription = client.subscribe(["test"], ignoreUnhandledEvents, parallelTopicProcessing);
subscription.useHandlerForAllTypes(async e => {
// handle the event here, throw in case of error
});
subscription.useHandler("event-type", async e => {
// handle the event here, throw in case of error
});
subscription.start();
// stop the connection when you are done
subscription.stop();import "github.com/fraym/freym-api/go/streams/domain/dto"
// ...
deploymentId := 0; // if > 0 you will only get events of deploymentIds <= this value
ignoreUnhandledEvents := false; // if true, the subscription will treat events that have no handler as handled
parallelTopicProcessing := false; // if true, the subscription will process events of different streams in parallel
subscription := client.NewSubscription([]string{"topic"}, ignoreUnhandledEvents, deploymentId, parallelTopicProcessing)
subscription.UseHandlerForAllEventTypes(
func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
// handle the event here
return true, nil
}
)
subscription.Use("event-type",
func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
// handle the event here
return true, nil
}
)
subscription.Start()
// stop the connection when you are done
subscription.Stop()subscription Topics {
topics(input: {
topics: ["topic"]
metadata: {
group: "my-app"
deploymentId: 0 # if > 0 you will only get events of deploymentIds <= this value
parallelTopicProcessing: false # if true, the subscription will process events of different streams in parallel
}
}) {
... on TopicSubscriptionDataEvent {
event {
id
}
}
... on TopicSubscriptionDataSubscribed {
error
subscriberId
}
... on TopicSubscriptionDataPanic {
reason
}
}
}
mutation EventHandled($input: EventHandledInput!) {
eventHandled(input: {
tenantId: "tenant-id"
topic: "topic"
stream: "stream"
id: "event-id"
error: "error-message" ## optional leave empty on no error
retry: true
})
}You can only register one handler per event type for a subscription. And you can only register one handler for all event types.
In case you register a "all event types" handler, it will be called for all event types that do not have a specific handler registered.
Parallel Topic Processing
You can use a parallelTopicProcessing subscription and a non parallel topic processing subscription at the same time.
Events that were handled by a parallel topic processing subscription will be handled by the non parallel topic processing subscription as well.
Subscription Error Handling
In case of errors during event handling, the event can be resent to another subscriber.
This is done by sending a Handled message with an error to the Streams service.
The Streams service will then try to deliver the event to another subscriber.
The subscriber gets the number of retries that happened for an event within the metadata of the event.
If you want to retry the event, you can return false from the handler.
import { StreamHandlerError } from "@fraym/streams";
subscription.useHandlerForAllTypes(async e => {
try {
// handle the event here
} catch (err) {
throw new StreamHandlerError(err, e.retryNumber > 3);
}
});subscription.UseHandlerForAllEventTypes(
func(ctx context.Context, event *dto.SubscriptionEvent) (bool, error) {
var err error
// handle the event here:
// err = handleEvent(event)
if err != nil {
return event.RetryNumber > 3,err
}
return false,nil
}
)## use the retry field to indicate that the event should be retried
mutation EventHandled {
eventHandled(input: {
tenantId: "tenant-id"
topic: "topic"
id: "event-id"
error: "error-message" ## optional leave empty on no error
retry: true
})
}Querying Events
The Streams service provides several APIs to query events:
Get an Event by Its ID
const event = await client.getEvent( "tenant-id", "topic", "event-id");event, err := client.GetEvent(context.TODO(), "tenant-id", "topic", "event-id")
if err != nil {
panic(err)
}query GetEvent {
getEvent(input: {
tenantId: "tenant-id"
topic: "topic"
id: "event-id"
}) {
id
}
}Get The Latest Event Within a Topic
const lastEvent = await client.getLastEvent("tenant-id", "topic");lastEvent, err := client.GetLastEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
panic(err)
}query GetLastEvent {
getLastEvent(input: {
tenantId: "tenant-id"
topic: "topic"
}) {
id
}
}Get the latest handled event within a topic
const lastHandledEvent = await client.getLastHandledEvent("tenant-id", "topic");lastHandledEvent, err := client.GetLastHandledEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
panic(err)
}query GetLastHandledEvent {
getLastHandledEvent(input: {
tenantId: "tenant-id"
topic: "topic"
groupId: "my-app"
}) {
id
}
}Get the Last Event Within a Topic That Belongs to a Given List of Event Types
const lastEvent = await client.getLastEventByTypes("tenant-id", "topic", ["event-type"]);lastEvent, err := client.GetLastEventByTypes(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"}
)
if err != nil {
panic(err)
}query GetLastEventByTypes {
getLastEventByTypes(input: {
tenantId: "tenant-id"
topic: "topic"
types: ["event-type"]
}) {
id
}
}Paginate Through All Events in a Topic That Belong to a Given List of Event Types
const pageSize = 100; // number of events to fetch per request
await client.iterateAllEvents("tenant-id", "topic", ["event-type"], pageSize, event => {
// process the event here
});pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory
isEmpty, err := client.IterateAllEvents(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"},
pageSize,
bufferSize,
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}## Using the GraphQL api you have to iterate pagination manually
query GetEvents {
getEvents(input: {
tenantId: "tenant-id",
page: 0,
perPage: 100,
topic: "topic",
types: ["event-type"]
}) {
id
}
}Paginate Through All Events in a Topic That Belong to a Given List of Event Types That Come After a Specific Event
const pageSize = 100; // number of events to fetch per request
await client.iterateAllEventsAfterEvent(
"tenant-id",
"topic",
["event-type"],
"event-id",
pageSize,
event => {
// process the event here
}
);pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory
isEmpty, err := client.IterateAllEventsAfterEvent(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"},
"event-id",
pageSize,
bufferSize,
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}## Using the GraphQL api you have to iterate pagination manually
query GetEventsAfterEventId {
getEvents(input: {
id: "event-id",
tenantId: "tenant-id",
page: 0,
perPage: 100,
topic: "topic",
types: ["event-type"]
}) {
id
}
}Querying Streams
The Streams service provides several APIs to query streams:
Check if a stream is empty
const pageSize = 100; // number of events to fetch per request
const streamIterator = await client.getStreamIterator("topic", "tenant-id", "stream", pageSize);
const isEmpty = await streamIterator.isEmpty();isEmpty, err := client.IsStreamEmpty(
context.TODO(),
"tenant-id",
"topic",
"stream",
)
if err != nil {
panic(err)
}query IsStreamEmpty {
isStreamEmpty(input: {
tenantId: "tenant-id",
topic: "topic",
stream: "stream",
})
}Paginate Through All Events of a Stream
const streamIterator = await client.getStreamIterator(
"topic",
"tenant-id",
"stream",
100,// number of events to fetch per request
false, // optional parameter, if true, the stream iterator will not use snapshots
);
await streamIterator.forEach(event => {
// process the event here
});isEmpty, err := client.IterateStream(
context.TODO(),
"tenant-id",
"topic",
"stream",
0, // id of the deployment, 0 ignores the deployment filter
100, // number of events to fetch per request
1000, // number of events to buffer in memory
false, // optional parameter, if true, the stream iterator will not use snapshots
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}## Using the GraphQL api you have to iterate pagination manually
query GetStream {
getStream(input: {
tenantId: "tenant-id",
topic: "topic",
stream: "stream",
page: 0,
perPage: 100,
deploymentId: 0, # optional: if you add this parameter, you will only get events of deploymentIds <= this value
snapshotEventId: "snapshot-event-id", # optional: if you add this parameter, you will only get events after the snapshot event and the snapshot event itself
doNotUseSnapshots: false # optional: if true, the query will not use snapshots
}) {
events {
id
}
streamEventId
}
}Paginate Through All Events of a Stream That Come After a Specific Event
const streamIterator = await client.getStreamIterator(
"topic",
"tenant-id",
"stream",
100, // number of events to fetch per request
false, // optional parameter, if true, the stream iterator will not use snapshots
);
await streamIterator.forEachAfterEvent("event-id", event => {
// process the event here
});isEmpty, err := client.IterateStreamAfterEvent(
context.TODO(),
"tenant-id",
"topic",
"stream",
"event-id",
0, // id of the deployment, 0 ignores the deployment filter
100, // number of events to fetch per request
1000, // number of events to buffer in memory
false, // optional parameter, if true, the stream iterator will not use snapshots
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}## Using the GraphQL api you have to iterate pagination manually
query GetStreamAfterEventId {
getStreamAfterEventId(input: {
eventId: "event-id",
tenantId: "tenant-id",
topic: "topic",
stream: "stream",
page: 0,
perPage: 100,
deploymentId: 0, # optional: if you add this parameter, you will only get events of deploymentIds <= this value
snapshotEventId: "snapshot-event-id",# optional: if you add this parameter, you will only get events after the snapshot event and the snapshot event itself
doNotUseSnapshots: false # optional: if true, the query will not use snapshots
}) {
events {
id
}
streamEventId
}
}Transaction Handling
You can wait for all events of a transaction (= correlation id) to be handled by a given set of consumer groups.
await client.waitForTransactionalConsistency(
"tenant-id",
"topic",
"correlation-id",
["consumer-group"]
);if err := client.WaitForTransactionalConsistency(
context.TODO(),
"tenant-id",
"topic",
"correlation-id",
[]string{"consumer-group"},
); err != nil {
panic(err)
}query WaitForTransactionalConsistency {
waitForTransactionalConsistency(input: {
tenantId: "tenant-id",
topic: "topic",
correlationId: "correlation-id",
consumerGroups: ["consumer-group"],
})
}Snapshotting
Snapshotting can be used to optimize performance when reading event streams. When a snapshot event is added to the event stream, all APIs that paginate the stream will not return any events that occurred before the snapshot. Instead, they return the snapshot event and any events that come after the snapshot.
await client.createStreamSnapshot(
"tenant-id",
"topic",
"stream",
"last-snapshotted-event-id",
{
tenantId: "tenant-id",
type: "snapshot-event-type",
payload: {
field: "value",
},
}
);if err := client.CreateStreamSnapshot(
context.TODO(),
"tenant-id",
"topic",
"stream",
"last-snapshotted-event-id",
&dto.PublishEvent{
TenantId: "tenant-id",
Type: "snapshot-event-type",
Payload: dto.PublishEventPayloadMap{
"field": {
Value: "value",
},
},
}
); err != nil {
panic(err)
}mutation CreateStreamSnapshot {
createStreamSnapshot(input: {
tenantId: "tenant-id",
topic: "topic",
stream: "stream",
lastSnapshottedEventId: "last-snapshotted-event-id",
snapshotEvent: {
tenantId: "tenant-id",
type: "snapshot-event-type",
payload: [{
key: "field",
value: "\"value\"",
}]
}
})
}GDPR
When publishing an event, a payload field can be associated with GDPR logic by specifying a default value for the field. The default will be applied if the field is invalidated.
await client.publish("topic", [
{
tenantId: "tenant-id",
type: "event-type",
payload: {
field: {
value: "value",
gdprDefault: "default-value",
},
},
},
]);import "github.com/fraym/streams-go/v10/domain/dto"
// ...
if err := client.Publish(
context.TODO(),
"topic",
[]*dto.PublishEvent{
{
TenantId: "tenant-id",
Type: "event-type",
Payload: dto.PublishEventPayloadMap{
"field": {
Value: "value",
Gdpr: &dto.PublishGdprEventPayload{
Default: "default-value",
}
},
},
}
}
); err != nil {
panic(err)
}mutation Publish {
publish(input: {
topic: "topic",
events: [{
tenantId: "tenant-id",
type: "event-type",
payload: [{
key: "field",
value: "\"value\"",
gdpr: {
default: "\"default-value\"",
}
}]
}]
})
}await client.introduceGdprOnEventField(
"tenant-id",
"default-value",
"topic",
"event-id",
"field"
);if err := client.IntroduceGdprOnEventField(
context.TODO(),
"tenant-id",
"topic",
"event-id",
"field",
"default-value"
); err != nil {
panic(err)
}mutation IntroduceGdprOnEventField {
introduceGdprOnEventField(input: {
tenantId: "tenant-id",
topic: "topic",
eventId: "event-id",
fieldName: "field",
defaultValue: "\"default-value\"",
})
}When GDPR-relevant data is published, an event of type gdpr-data-recorded is published to the gdpr topic.
This event contains all the information needed to build the logic to invalidate the data if necessary.
While the CRUD service takes care of automatically invalidating GDPR relevant CRUD data,
you need to implement GDPR logic using the events in the gdpr topic for any events that your business logic uses.
The invalidation API ensures that the GDPR relevant data is deleted and replaced with the default value. After the invalidation, there is no way to recover the original data as any trace of it is removed.
await client.invalidateGdprData("tenant-id","topic", "gdpr-id");if err := client.InvalidateGdprData(
context.TODO(),
"tenant-id",
"topic",
"gdpr-id"
); err != nil {
panic(err)
}mutation InvalidateGdpr {
invalidateGdpr(input: {
tenantId: "tenant-id",
topic: "topic",
gdprId: "gdpr-id",
})
}Renaming of Event Types
If you need to rename an event type, you can use the renameEventType API.
This is an operation where no data is getting lost as the the event type is simply renamed.
await client.renameEventType("topic", "old-event-type", "new-event-type");if err := client.RenameEventType(
context.TODO(),
"topic",
"old-event-type",
"new-event-type",
); err != nil {
panic(err)
}mutation RenameEventType {
renameEventType(input: {
topic: "topic",
oldType: "old-event-type",
newType: "new-event-type",
})
}Erroneous Events
List Erroneous Events
Returns a list of erroneous events sorted by the event date.
const erroneousEvents = await client.listErroneousEvents(
"tenant-id",
"topic",
["event-type"],
1000, // limit
);if erroneousEvents, err := client.ListErroneousEvents(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"},
1000, // limit
); err != nil {
panic(err)
}query GetErroneousEvents {
getErroneousEvents(input: {
tenantId: "tenant-id",
topic: "topic",
eventTypes: ["event-type"], # leave empty to get all event types
limit: 1000, # optional
}) {
consumerGroup
error
event {
id
}
}
}Resend Erroneous Events
await client.resendErroneousEvent("tenant-id", "topic", "consumer-group", "event-id");if err := client.ResendErroneousEvent(
context.TODO(),
"tenant-id",
"topic",
"consumer-group",
"event-id",
); err != nil {
panic(err)
}mutation ResendErroneousEvent {
resendErroneousEvent(input: {
tenantId: "tenant-id",
topic: "topic",
consumerGroup: "consumer-group",
eventId: "event-id",
})
}