Events

Data Mesh Manager provides an HTTP Feed endpoint to subscribe to events.

Events are published in the feed in near-realtime when significant events happen to a data product or a data usage agreement. You can implement a component that runs in your data platform and trigger further actions, such as permission granting or revoking.

HTTP Feed

An HTTP feed provides an HTTP GET endpoint that returns a chronological sequence (!) of events serialized in CloudEvents format in batched responses using the media type application/cloudevents-batch+json.

It uses the lastEventId query parameter to scroll through further items and supports infinite polling for near-realtime feed subscriptions.

When implementing a consumer, you need to have a long-running process (e.g., in a Docker container) and a persistent storage (e.g., a database or a file in an S3 bucket) to persist the lastEventId.

Read more

Kafka Connector

You can set up a Kafka HTTP Source Connector to subscribe the HTTP Feed.


Subscription

An events feed client can poll in an infinite loop to subscribe to a feed:

Pseudocode

endpoint = "https://api.datamesh-manager.com/api/events"
lastEventId = read lastEventId from database or else null

while true:
  try:
    response = GET endpoint + "?lastEventId=" + lastEventId
    for event in response:
      process event
      lastEventId = event.id
      persist lastEventId
  except:
    // Delay the next request only in case of a server error
    wait 5 seconds

A client retrieves a batched response (max. 1000 events per response) and processes one event after another. After processing one event, the client persists the event.id as lastEventId and continues processing the events in this response. When all entries are processed, the client immediately continues with the next request with the last persisted lastEventId. When the feed contains no newer events, the server waits some seconds until new data is retrieved, or it sends an empty array []. Then the client immediately continues with the next request with the last persisted lastEventId. In case of an error, the client needs to wait for 5 seconds, until the next request is sent.


GET/api/events

Get events

This endpoint allows you to retrieve a batch of events, starting after the lastEventId.

Optional attributes

  • Name
    lastEventId
    Type
    string
    Required
    Description

    The id (from the CloudEvent envelope) of the last processed event.

Request

GET
/api/events
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1c82203f-f792-4fc3-94cb-59141f1d0793" \
--header "x-api-key: $DMM_API_KEY"

Response

[
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e6a7-60e2-ae80-2788ab757ee0",
  "type": "com.datamesh-manager.events.DataUsageAgreementRequestedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.052693Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/DataUsageAgreementRequestedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.052636441Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e7c9-6953-ae80-2d4755b4bfa9",
  "type": "com.datamesh-manager.events.DataUsageAgreementApprovedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.216357Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/DataUsageAgreementApprovedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.216330881Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e7f0-6a54-ae80-033fc3b099d2",
  "type": "com.datamesh-manager.events.DataUsageAgreementActivatedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.232604Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/DataUsageAgreementActivatedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.232578049Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f",
  "type": "com.datamesh-manager.events.DataUsageAgreementDeactivatedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-19T13:54:56.482679Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/DataUsageAgreementDeactivatedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-19T13:54:56.478285988Z"
  }
}
]

Request

GET
/api/events
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f" \
--header "x-api-key: $DMM_API_KEY"

Response

[]

The client can filter for relevant event types. And retrieve the current state of the relevant resource by its data.id.