Streaming Ingest API

The Streaming Ingest API is a tenant-specific API that supports real-time streaming of data from external systems to Amperity.

Overview

The Streaming Ingest API is designed for streaming events and profile updates. It is a low latency, high throughput REST API, designed to accept billions of records per day.

The Streaming Ingest API is configured to use different streams to load data into individual feeds. For example, order events might be sent to one stream while profile updates are sent to another. Individual streams have a distinguished endpoint /stream/v0/data/<stream-id>.

The Streaming Ingest API supports the following payload types:

  1. JSON (preferred), which converts streaming data to NDJSON

  2. XML, which converts streaming data to CBOR

A stream may only be one payload type.

API Keys and JWTs

Amperity uses a JSON Web Token (JWT) for authentication to the Streaming Ingest API. A single access token may be used to access any endpoint in your tenant’s Streaming Ingest API.

The access token is self-generated from the Amperity user interface and authorizes write access to the Streaming Ingest API for your tenant. A self-generated access token ensures that only your team has access to the token and supports organizational security policies like periodic access token rotation.

The access token must be available to each request made to the Streaming Ingest API.

Note

More information about Amperity API keys is available, including sections about adding API keys, deleting API tokens, rotating API tokens, generating access tokens, and refreshing access tokens.

Add API key

A write access token enables your upstream use cases to write data to the Streaming Ingest API.

To add an access token for the Streaming Ingest API

Step 1.

Open the Settings page, and then select the Security tab. Under API keys click Add API key.

Step 2.

From the Add API key dialog, add the name for the API key, select the Streaming Ingest Write Access option, and then click Save.

Generate an API key.

Generate an access token

Access tokens that enable authentication to the Amperity API are managed directly from the Users & Activity page in Amperity.

To generate access tokens

Step 1.

Open the Users & Activity page.

Step 2.

Under API keys find the API key for which you want to generate an access token, and then from the Actions menu select “Generate access token”.

Generate an access token.
Step 3.

Select the number of days this token will allow access to the API, after which it will expire. For example, 3 days:

Generate an access token.

The token is generated, and then automatically copied to your clipboard.

Generate an access token.

Important

You are the only person who will have access to the newly-generated access key. Amperity does not save the access key anywhere and it will disappear when you close this dialog. Store the access key in a safe place.

Configure endpoints

You can self-manage the endpoints your brand uses to stream data to Amperity.

To manage Streaming Ingest API endpoints

Step 1.

Open the Sources page.

Step 2.

Under Streaming Ingest click Add stream.

Add a Streaming Ingest API endpoint.

Enter a name and description for the Streaming Ingest API endpoint.

Add a name and description for the Streaming Ingest API endpoint.
Step 2.

The ID for the Streaming Ingest API endpoint is available from the Stream ID column:

Get the ID for the Streaming Ingest API endpoint.

Use this identifier in the path for the POST request when sending data to the Streaming Ingest API endpoint.

For example:

POST /stream/v0/data/is-AbCDefGH HTTP/1.1

Send to data streams

A data stream is generated for you by Amperity on request. Contact your support representative via the Amperity Support Portal (or send email to support@amperity.com) to request a new data stream.

Data can be sent to the Streaming Ingest API by issuing POST requests to the /stream/v0/data/<stream-id> endpoint.

Important

Amperity does not enforce any particular data schema. Each data schema is a unique stream that depends on what is being sent. You cannot have multiple data schemas on a single stream, instead use multiple streams to support multiple schemas.

About Postman

Postman is a collaboration platform for API development that enables support for sending data to Amperity using the Streaming Ingest REST API.

Amperity will provide complete details for using a Postman collection for the Streaming Ingest REST API when your tenant is initialized. Use this template as the starting point for building out the API stream for your data source.

Send JSON using HTTP

To send JSON data to a stream using HTTP, submit a request similar to:

POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/json
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164

{"field1": "value1",
 "field2": "value2"}

Send JSON using cURL

To send JSON data to a stream using cURL, submit a request similar to:

curl -XPOST \
-H "Content-Type: application/json" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
' {"field1": "value1",
   "field2": "value2"}'

Send XML using HTTP

To send XML data to a stream using HTTP, submit a request similar to:

POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/xml
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164

 <records>
 <record>
 <field1>value1</field1>
 ...
 </record>
 <record>
 <field1>value2</field1>
 ... t
 </record>
 </records>

Send XML using cURL

To send XML data to a stream using cURL, submit a request similar to:

curl -XPOST \
-H "Content-Type: application/xml" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
'<records>
 <record>
 <field1>value1</field1>
 </record>
 <record>
 <field1>value2</field1>
 </record>
 </records>
'

HTTP response status codes

The Streaming Ingest API has the following HTTP status codes:

HTTP code

Description

Retry?

202

Accepted.

N/A

400

Request malformed.

Note

XML payloads are not checked synchronously; a 202 response does not guarantee that XML payloads will be parsable downstream.

No

401

Unauthorized. JWT could not be verified or is expired.

No

413

Request is too large.

Note

Amperity limits the maximum payload size to 500 kb.

No

429

Request throttled.

Note

The request limit is set above the expected traffic volume.

Yes

500

Internal error.

Yes

503

Service unavailable.

Yes

504

Gateway timeout.

Note

All retries have exponential back off.

Yes

Load stream data

Once data is sent to a stream, it is batched and collected to be made ready for ingest. Similar to loading files-based data, streamed data is loaded into feeds through couriers and is done from the Sources page.

Note

  • The Streaming Ingest API only accepts individual JSON payloads (and does not accept NDJSON payloads)

  • JSON payloads are combined into a single NDJSON file

  • Nested JSON payloads require a saved query to flatten the data

  • XML payloads are converted into CBOR by the streaming API

  • CBOR requires a saved query to transform the data into a tabular format

Load simple JSON data

Simple JSON data is batched together into NDJSON files that can be loaded directly to Amperity. A simple JSON schema does not contain nested values, which means that none of the values in the schema are JSON objects or arrays. For example:

{'field1': 'value1',
 'field2': 'value2'}

NDJSON data is loaded to Amperity using the NDJSON file format. Configure a courier load settings and operations, and then define a feed.

Load nested JSON data

Nested JSON data requires a saved query to parse the nested values, after which the data is parsed into NDJSON files that can be loaded directly to Amperity. A nested JSON schema has values that are JSON objects or arrays. For example:

{'field1': 'value1',
 'field2': {'nested-field1': 'nested-value1',
            'nested-field2': 'nested-value2'}}

Nested NDJSON data is loaded to Amperity using the NDJSON file format. Define an ingest query to flatten the data into a tabular format, configure a courier load settings and operations, and then define a feed.

Load CBOR data

To load streamed XML data that has been converted to CBOR format into Amperity, it must first be flattened into tabular format using a saved query. Use Spark SQL to extract any part of the CBOR file, and then format it into columns.

A saved query is a SQL statement that may be applied to data prior to loading it to a domain table. A saved query is defined using Spark SQL syntax.

Tip

Use Databricks to design the saved query workflow. In most cases you can design a query against a stream that is located in the container that comes with the Amperity tenant. This is an Amazon S3 bucket or an Azure Blob Storage container, depending on the cloud platform in which your tenant runs.

  1. Connect Databricks to the container.

  2. Load the CBOR file from the container to Databricks.

  3. Define a SQL query that shapes the data.

  4. Create a sample file, and then use it to add a feed, below.

XML data sent to the Streaming Ingest API is loaded to Amperity using the CBOR file format. Define an ingest query, configure courier load settings and operations, and then define a feed.

Pull from Streaming Ingest

The Streaming Ingest API is a tenant-specific API that supports real-time streaming of data from external systems to Amperity.

This topic describes the steps that are required to pull streamed data to Amperity from Streaming Ingest:

  1. Add courier

  2. Get sample files

  3. Add feeds

  4. Add load operations

  5. Run courier

  6. Add to courier group

Add courier

The Streaming Ingest courier pulls your data from the location that the Streaming Ingest API streams data to Amperity. A courier is required for each data stream.

Tip

You can run a courier with an empty load operation using {} as the value for the load operation. Use this approach to get files to upload during feed creation, as a feed requires knowing the schema of a file before you can apply semantic tagging and other feed configuration settings.

To add a courier for Streaming Ingest

  1. From the Sources page, click Add Courier. The Add Source page opens.

  2. Find, and then click the icon for Streaming Ingest. The Add Courier page opens.

  3. Enter the name of the courier. For example: “Streaming Ingest”.

  4. A courier that pulls data that was streamed to Amperity by the Streaming Ingest API does not require a credential even though the configuration steps will ask you to provide a credential. Create a new credential, name it “<tenant>-streaming-ingest” and give it a description like “Pull streams to Amperity for Streaming Ingest API”.

  5. Under Streaming Ingest Settings, add the Streaming Ingest endpoint ID which is available from the Stream ID column in the Sources page.

    Specify the File format, which can be XML, NDJSON, or JSON. If you are sending JSON data, Amperity will batch your data into NDJSON so select that if you’re sending JSON data.

    Set the File tag. This is usually just streaming. Set this within the file tag in load operations and the file tag text box.

    Enter the File pattern prefix, which is useful for time based ingestion of streaming data. This setting may be configured to load data on an hourly basis. Possible values range from 00 - 24, each of which represents an hour in a 24 hour window. For example, use 00 to load data at 12:00 AM, 08 to load data at 8:00 AM, or 12 to load data at 12:00 PM. A courier may only be configured to use a single file pattern prefix.

  6. Set the load operations to a string that is obviously incorrect, such as df-xxxxxx. (You may also set the load operation to empty: “{}”.)

    Tip

    If you use an obviously incorrect string, the load operation settings will be saved in the courier configuration. After the schema for the feed is defined and the feed is activated, you can edit the courier and replace the feed ID with the correct identifier.

    Caution

    If load operations are not set to “{}” the validation test for the courier configuration settings will fail.

  7. Click Save.

Get sample files

Every Streaming Ingest file that is pulled to Amperity must be configured as a feed. Before you can configure each feed you need to know the schema of that file. Run the courier without load operations to bring sample files from Streaming Ingest to Amperity, and then use each of those files to configure a feed.

To get sample files

  1. From the Sources tab, open the menu for a courier configured for Streaming Ingest with empty load operations, and then select Run. The Run Courier dialog box opens.

  2. Select Load data from a specific day, and then select today’s date.

  3. Click Run.

    Important

    The courier run will fail, but this process will successfully return a list of files from Streaming Ingest.

    These files will be available for selection as an existing source from the Add Feed dialog box.

  4. Wait for the notification for this courier run to return an error similar to:

    Error running load-operations task
    Cannot find required feeds: "df-xxxxxx"
    

Add feeds

A feed defines how data should be loaded into a domain table, including specifying which columns are required and which columns should be associated with a semantic tag that indicates that column contains customer profile (PII) and transactions data.

Note

A feed must be added for each file that is pulled from Streaming Ingest, including all files that contain customer records and interaction records, along with any other files that will be used to support downstream workflows.

To add a feed

  1. From the Sources tab, click Add Feed. This opens the Add Feed dialog box.

  2. Under Data Source, select Create new source, and then enter “Streaming Ingest”.

  3. Enter the name of the feed in Feed Name. For example: “WebEvents”.

    Tip

    The name of the domain table will be “<data-source-name>:<feed-name>”. For example: “Streaming Ingest:WebEvents”.

  4. Under Sample File, select Select existing file, and then choose from the list of files. For example: “filename_YYYY-MM-DD.json”.

    Tip

    The list of files that is available from this drop-down menu is sorted from newest to oldest.

  5. Select Load sample file on feed activation.

  6. Click Continue. This opens the Feed Editor page.

  7. Select the primary key.

  8. Apply semantic tags to customer records and interaction records, as appropriate.

  9. Under Last updated field, specify which field best describes when records in the table were last updated.

    Tip

    Choose Generate an “updated” field to have Amperity generate this field. This is the recommended option unless there is a field already in the table that reliably provides this data.

  10. For feeds with customer records (PII data), select Make available to Stitch.

  11. Click Activate. Wait for the feed to finish loading data to the domain table, and then review the sample data for that domain table from the Data Explorer.

Add load operations

After the feeds are activated and domain tables are available, add the load operations to the courier used for Streaming Ingest.

Example load operations

Load operations must specify each file that will be pulled to Amperity from Streaming Ingest.

For example:

{
  "FEED_ID": [
    {
      "type": "load",
      "file": "file_tag"
    }
  ]
}

To add load operations

  1. From the Sources tab, open the menu for the courier that was configured for Streaming Ingest, and then select Edit. The Edit Courier dialog box opens.

  2. Edit the load operations for each of the feeds that were configured for Streaming Ingest so they have the correct feed ID.

  3. Click Save.

Run courier manually

Run the courier again. This time, because the load operations are present and the feeds are configured, the courier will pull data from Streaming Ingest.

To run the courier manually

  1. From the Sources tab, open the    menu for the courier with updated load operations that is configured for Streaming Ingest, and then select Run. The Run Courier dialog box opens.

  2. Select the load option, either for a specific time period or all available data. Actual data will be loaded to a domain table because the feed is configured.

  3. Click Run.

    This time the notification will return a message similar to:

    Completed in 5 minutes 12 seconds
    

Add to courier group

A courier group is a list of one (or more) couriers that are run as a group, either ad hoc or as part of an automated schedule. A courier group can be configured to act as a constraint on downstream workflows.

Important

Be sure to configure a courier that is associated with a streaming endpoint to have a 24-hour offset. This will ensure that all data that is streamed to the endpoint will be available to the scheduled workflow.

To add the courier to a courier group

  1. From the Sources tab, click Add Courier Group. This opens the Create Courier Group dialog box.

  2. Enter the name of the courier. For example: “Streaming Ingest”.

  3. Add a cron string to the Schedule field to define a schedule for the orchestration group.

    A schedule defines the frequency at which a courier group runs. All couriers in the same courier group run as a unit and all tasks must complete before a downstream process can be started. The schedule is defined using cron.

    Cron syntax specifies the fixed time, date, or interval at which cron will run. Each line represents a job, and is defined like this:

    ┌───────── minute (0 - 59)
    │ ┌─────────── hour (0 - 23)
    │ │ ┌───────────── day of the month (1 - 31)
    │ │ │ ┌────────────── month (1 - 12)
    │ │ │ │ ┌─────────────── day of the week (0 - 6) (Sunday to Saturday)
    │ │ │ │ │
    │ │ │ │ │
    │ │ │ │ │
    * * * * * command to execute
    

    For example, 30 8 * * * represents “run at 8:30 AM every day” and 30 8 * * 0 represents “run at 8:30 AM every Sunday”. Amperity validates your cron syntax and shows you the results. You may also use crontab guru to validate cron syntax.

  4. Set Status to Enabled.

  5. Specify a time zone.

    A courier group schedule is associated with a time zone. The time zone determines the point at which a courier group’s scheduled start time begins. A time zone should be aligned with the time zone of system from which the data is being pulled.

    Use the Use this time zone for file date ranges checkbox to use the selected time zone to look for files. If unchecked, the courier group will use the current time in UTC to look for files to pick up.

    Note

    The time zone that is chosen for an courier group schedule should consider every downstream business processes that requires the data and also the time zone(s) in which the consumers of that data will operate.

  6. Add at least one courier to the courier group. Select the name of the courier from the Courier drop-down. Click + Add Courier to add more couriers.

  7. Click Add a courier group constraint, and then select a courier group from the drop-down list.

    A wait time is a constraint placed on a courier group that defines an extended time window for data to be made available at the source location.

    Important

    A wait time is not required for a bridge.

    A courier group typically runs on an automated schedule that expects customer data to be available at the source location within a defined time window. However, in some cases, the customer data may be delayed and isn’t made available within that time window.

  8. For each courier group constraint, apply any offsets.

    A courier can be configured to look for files within range of time that is older than the scheduled time. The scheduled time is in Coordinated Universal Time (UTC), unless the “Use this time zone for file date ranges” checkbox is enabled for the courier group.

    This range is typically 24 hours, but may be configured for longer ranges. For example, it’s possible for a data file to be generated with a correct file name and datestamp appended to it, but for that datestamp to represent the previous day because of how an upstream workflow is configured. A wait time helps ensure that the data at the source location is recognized correctly by the courier.

    Warning

    This range of time may affect couriers in a courier group whether or not they run on a schedule. A manually run courier group may not take its schedule into consideration when determining the date range; only the provided input day(s) to load data from are used as inputs.

  9. Click Save.