Streaming Ingest API

The Streaming Ingest API is a tenant-specific API that supports real-time streaming of data from external systems to Amperity.

Important

The Streaming Ingest API must be enabled for use in Amperity. Contact your support representative via the Amperity Support Portal (or send email to support@amperity.com) to request adding the Streaming Ingest API capabilities to your tenant.

Overview

The Streaming Ingest API is designed for streaming events and profile updates. It is a low latency, high throughput REST API, designed to accept billions of records per day.

The Streaming Ingest API is configured to use different streams to load data into individual feeds. For example, order events might be sent to one stream while profile updates are sent to another. Individual streams have a distinguished endpoint /stream/v0/data/<stream-id>.

The Streaming Ingest API supports the following payload types:

  1. JSON (preferred), which converts streaming data to NDJSON

  2. XML, which converts streaming data to CBOR

A stream may only be one payload type.

API Keys and JWTs

Amperity uses a JSON Web Token (JWT) for authentication to the Streaming Ingest API. A single access token may be used to access any endpoint in your tenant’s Streaming Ingest API.

The access token is self-generated from the Amperity user interface and authorizes write access to the Streaming Ingest API for your tenant. A self-generated access token ensures that only your team has access to the token and supports organizational security policies like periodic access token rotation.

The access token must be available to each request made to the Streaming Ingest API.

Note

More information about Amperity API keys is available, including sections about adding API keys, deleting API tokens, rotating API tokens, generating access tokens, and refreshing access tokens.

Add API key

A write access token enables your upstream use cases to write data to the Streaming Ingest API.

To add a data access token for the Streaming Ingest API

Step 1.

Open the Users & Activity page. Under API keys click Add API key.

Step 2.

From the Add API key dialog, add the name for the API key, select the Streaming Ingest Write Access option, and then click Save.

Generate an API key.

Generate an access token

Access tokens that enable authentication to the Amperity API are managed directly from the Users & Activity page in Amperity.

To generate access tokens

Step 1.

Open the Users & Activity page.

Step 2.

Under API keys find the API key for which you want to generate an access token, and then from the Actions menu select “Generate access token”.

Generate an access token.
Step 3.

Select the number of days this token will allow access to the API, after which it will expire. For example, 3 days:

Generate an access token.

The token is generated, and then automatically copied to your clipboard.

Generate an access token.

Important

You are the only person who will have access to the newly-generated access key. Amperity does not save the access key anywhere and it will disappear when you close this dialog. Store the access key in a safe place.

Send to data streams

A data stream is generated for you by Amperity on request. Contact your support representative via the Amperity Support Portal (or send email to support@amperity.com) to request a new data stream.

Data can be sent to the Streaming Ingest API by issuing POST requests to the /stream/v0/data/<stream-id> endpoint.

Important

Amperity does not enforce any particular data schema. Each data schema is a unique stream that depends on what is being sent. You cannot have multiple data schemas on a single stream, instead use multiple streams to support multiple schemas.

About Postman

Postman is a collaboration platform for API development that enables support for sending data to Amperity using the Streaming Ingest REST API.

Amperity will provide complete details for using a Postman collection for the Streaming Ingest REST API when your tenant is initialized. Use this template as the starting point for building out the API stream for your data source.

Send JSON using HTTP

To send JSON data to a stream using HTTP, submit a request similar to:

POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/json
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164

{"field1": "value1",
 "field2": "value2"}

Send JSON using cURL

To send JSON data to a stream using cURL, submit a request similar to:

curl -XPOST \
-H "Content-Type: application/json" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
' {"field1": "value1",
   "field2": "value2"}'

Send XML using HTTP

To send XML data to a stream using HTTP, submit a request similar to:

POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/xml
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164

 <records>
 <record>
 <field1>value1</field1>
 ...
 </record>
 <record>
 <field1>value2</field1>
 ... t
 </record>
 </records>

Send XML using cURL

To send XML data to a stream using cURL, submit a request similar to:

curl -XPOST \
-H "Content-Type: application/xml" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
'<records>
 <record>
 <field1>value1</field1>
 </record>
 <record>
 <field1>value2</field1>
 </record>
 </records>
'

HTTP response status codes

The Streaming Ingest API has the following HTTP status codes:

HTTP code

Description

Retry?

202

Accepted.

N/A

400

Request malformed.

Note

XML payloads are not checked synchronously; a 202 response does not guarantee that XML payloads will be parsable downstream.

No

401

Unauthorized. JWT could not be verified or is expired.

No

413

Request is too large.

Note

Amperity limits the maximum payload size to 254 kb.

No

429

Request throttled.

Note

The request limit is set above the expected traffic volume.

Yes

500

Internal error.

Yes

503

Service unavailable.

Yes

504

Gateway timeout.

Note

All retries have exponential back off.

Yes

Load stream data

Once data is sent to a stream, it is batched and collected to be made ready for ingest. Similar to loading files-based data, streamed data is loaded into feeds through couriers and is done from the Sources page.

Note

  • The Streaming Ingest API only accepts individual JSON payloads (and does not accept NDJSON payloads)

  • JSON payloads are combined into a single NDJSON file

  • Nested JSON payloads require a saved query to flatten the data

  • XML payloads are converted into CBOR by the streaming API

  • CBOR requires a saved query to transform the data into a tabular format

Load simple JSON data

Simple JSON data is batched together into NDJSON files that can be loaded directly to Amperity. A simple JSON schema does not contain nested values, which means that none of the values in the schema are JSON objects or arrays. For example:

{'field1': 'value1',
 'field2': 'value2'}

NDJSON data is loaded to Amperity using the NDJSON file format. Configure a courier load settings and operations, and then define a feed.

Load nested JSON data

Nested JSON data requires a saved query to parse the nested values, after which the data is parsed into NDJSON files that can be loaded directly to Amperity. A nested JSON schema has values that are JSON objects or arrays. For example:

{'field1': 'value1',
 'field2': {'nested-field1': 'nested-value1',
            'nested-field2': 'nested-value2'}}

Nested NDJSON data is loaded to Amperity using the NDJSON file format. Define an ingest query to flatten the data into a tabular format, configure a courier load settings and operations, and then define a feed.

Load CBOR data

To load streamed XML data that has been converted to CBOR format into Amperity, it must first be flattened into tabular format using a saved query. Use Spark SQL to extract any part of the CBOR file, and then format it into columns.

A saved query is a SQL statement that may be applied to data prior to loading it to a domain table. A saved query is defined using Spark SQL syntax.

Tip

Use Databricks to design the saved query workflow. In most cases you can design a query against a stream that is located in the container that comes with the Amperity tenant. This is an Amazon S3 bucket or an Azure Blob Storage container, depending on the cloud platform in which your tenant runs.

  1. Connect Databricks to the container.

  2. Load the CBOR file from the container to Databricks.

  3. Define a SQL query that shapes the data.

  4. Create a sample file, and then use it to add a feed, below.

XML data sent to the Streaming Ingest API is loaded to Amperity using the CBOR file format. Define an ingest query, configure courier load settings and operations, and then define a feed.