Streaming Ingest API¶
The Streaming Ingest API is a tenant-specific API that supports real-time streaming of data from external systems to Amperity.
Overview¶
The Streaming Ingest API is designed for streaming events and profile updates. It is a low latency, high throughput REST API, designed to accept billions of records per day.
The Streaming Ingest API is configured to use different streams to load data into individual feeds. For example, order events might be sent to one stream while profile updates are sent to another. Individual streams have a distinguished endpoint /stream/v0/data/<stream-id>
.
The Streaming Ingest API supports the following payload types:
JSON (preferred), which converts streaming data to NDJSON
XML, which converts streaming data to CBOR
A stream may only be one payload type.
API Keys and JWTs¶
Amperity uses a JSON Web Token (JWT) for authentication to the Streaming Ingest API. A single access token may be used to access any endpoint in your tenant’s Streaming Ingest API.
The access token is self-generated from the Amperity user interface and authorizes write access to the Streaming Ingest API for your tenant. A self-generated access token ensures that only your team has access to the token and supports organizational security policies like periodic access token rotation.
The access token must be available to each request made to the Streaming Ingest API.
Note
More information about Amperity API keys is available, including sections about adding API keys, deleting API tokens, rotating API tokens, generating access tokens, and refreshing access tokens.
Add API key¶
A write access token enables your upstream use cases to write data to the Streaming Ingest API.
To add an access token for the Streaming Ingest API
Open the Settings page, and then select the Security tab. Under API keys click Add API key. |
|
From the Add API key dialog, add the name for the API key, select the Streaming Ingest Write Access option, and then click Save. |
Generate an access token¶
Access tokens that enable authentication to the Amperity API are managed directly from the Users & Activity page in Amperity.
To generate access tokens
Open the Users & Activity page. |
|
Under API keys find the API key for which you want to generate an access token, and then from the Actions menu select “Generate access token”. |
|
Select the number of days this token will allow access to the API, after which it will expire. For example, 3 days: The token is generated, and then automatically copied to your clipboard. Important You are the only person who will have access to the newly-generated access key. Amperity does not save the access key anywhere and it will disappear when you close this dialog. Store the access key in a safe place. |
Configure endpoints¶
You can self-manage the endpoints your brand uses to stream data to Amperity.
To manage Streaming Ingest API endpoints
Open the Sources page. |
|
Under Streaming Ingest click Add stream. Enter a name and description for the Streaming Ingest API endpoint. |
|
The ID for the Streaming Ingest API endpoint is available from the Stream ID column: Use this identifier in the path for the POST request when sending data to the Streaming Ingest API endpoint. For example: POST /stream/v0/data/is-AbCDefGH HTTP/1.1
|
Send to data streams¶
A data stream is generated for you by Amperity on request. Contact your support representative via the Amperity Support Portal (or send email to support@amperity.com) to request a new data stream.
Data can be sent to the Streaming Ingest API by issuing POST requests to the /stream/v0/data/<stream-id>
endpoint.
Important
Amperity does not enforce any particular data schema. Each data schema is a unique stream that depends on what is being sent. You cannot have multiple data schemas on a single stream, instead use multiple streams to support multiple schemas.
About Postman
Postman is a collaboration platform for API development that enables support for sending data to Amperity using the Streaming Ingest REST API.
Amperity will provide complete details for using a Postman collection for the Streaming Ingest REST API when your tenant is initialized. Use this template as the starting point for building out the API stream for your data source.
Send JSON using HTTP¶
To send JSON data to a stream using HTTP, submit a request similar to:
POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/json
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164
{"field1": "value1",
"field2": "value2"}
Send JSON using cURL¶
To send JSON data to a stream using cURL, submit a request similar to:
curl -XPOST \
-H "Content-Type: application/json" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
' {"field1": "value1",
"field2": "value2"}'
Send XML using HTTP¶
To send XML data to a stream using HTTP, submit a request similar to:
POST /stream/v0/data/<stream-id> HTTP/1.1
Host: https://<tenant-name>.amperity.com
Content-Type: application/xml
X-Amperity-Tenant: <tenant-name>
Authorization: Bearer <Streaming Ingest JWT token>
Content-Length: 32164
<records>
<record>
<field1>value1</field1>
...
</record>
<record>
<field1>value2</field1>
... t
</record>
</records>
Send XML using cURL¶
To send XML data to a stream using cURL, submit a request similar to:
curl -XPOST \
-H "Content-Type: application/xml" \
-H "X-Amperity-Tenant: <tenant-name>" \
-H "Authorization: Bearer <Streaming Ingest JWT token>" \
https://<tenant-name>.amperity.com/stream/v0/data/<stream-id> \
--data-binary \
'<records>
<record>
<field1>value1</field1>
</record>
<record>
<field1>value2</field1>
</record>
</records>
'
HTTP response status codes¶
The Streaming Ingest API has the following HTTP status codes:
HTTP code |
Description |
Retry? |
---|---|---|
202 |
Accepted. |
N/A |
400 |
Request malformed. Note XML payloads are not checked synchronously; a 202 response does not guarantee that XML payloads will be parsable downstream. |
No |
401 |
Unauthorized. JWT could not be verified or is expired. |
No |
413 |
Request is too large. Note Amperity limits the maximum payload size to 254 kb. |
No |
429 |
Request throttled. Note The request limit is set above the expected traffic volume. |
Yes |
500 |
Internal error. |
Yes |
503 |
Service unavailable. |
Yes |
504 |
Gateway timeout. Note All retries have exponential back off. |
Yes |
Load stream data¶
Once data is sent to a stream, it is batched and collected to be made ready for ingest. Similar to loading files-based data, streamed data is loaded into feeds through couriers and is done from the Sources page.
Note
The Streaming Ingest API only accepts individual JSON payloads (and does not accept NDJSON payloads)
JSON payloads are combined into a single NDJSON file
Nested JSON payloads require a saved query to flatten the data
XML payloads are converted into CBOR by the streaming API
CBOR requires a saved query to transform the data into a tabular format
Load simple JSON data¶
Simple JSON data is batched together into NDJSON files that can be loaded directly to Amperity. A simple JSON schema does not contain nested values, which means that none of the values in the schema are JSON objects or arrays. For example:
{'field1': 'value1',
'field2': 'value2'}
NDJSON data is loaded to Amperity using the NDJSON file format. Configure a courier load settings and operations, and then define a feed.
Load nested JSON data¶
Nested JSON data requires a saved query to parse the nested values, after which the data is parsed into NDJSON files that can be loaded directly to Amperity. A nested JSON schema has values that are JSON objects or arrays. For example:
{'field1': 'value1',
'field2': {'nested-field1': 'nested-value1',
'nested-field2': 'nested-value2'}}
Nested NDJSON data is loaded to Amperity using the NDJSON file format. Define an ingest query to flatten the data into a tabular format, configure a courier load settings and operations, and then define a feed.
Load CBOR data¶
To load streamed XML data that has been converted to CBOR format into Amperity, it must first be flattened into tabular format using a saved query. Use Spark SQL to extract any part of the CBOR file, and then format it into columns.
A saved query is a SQL statement that may be applied to data prior to loading it to a domain table. A saved query is defined using Spark SQL syntax.
Tip
Use Databricks to design the saved query workflow. In most cases you can design a query against a stream that is located in the container that comes with the Amperity tenant. This is an Amazon S3 bucket or an Azure Blob Storage container, depending on the cloud platform in which your tenant runs.
Connect Databricks to the container.
Load the CBOR file from the container to Databricks.
Define a SQL query that shapes the data.
Create a sample file, and then use it to add a feed, below.
XML data sent to the Streaming Ingest API is loaded to Amperity using the CBOR file format. Define an ingest query, configure courier load settings and operations, and then define a feed.
Pull from Streaming Ingest¶
The Streaming Ingest API is a tenant-specific API that supports real-time streaming of data from external systems to Amperity.
This topic describes the steps that are required to pull streamed data to Amperity from Streaming Ingest:
Add courier¶
The Streaming Ingest courier pulls your data from the location that the Streaming Ingest API streams data to Amperity. A courier is required for each data stream.
Tip
You can run a courier with an empty load operation using {}
as the value for the load operation. Use this approach to get files to upload during feed creation, as a feed requires knowing the schema of a file before you can apply semantic tagging and other feed configuration settings.
To add a courier for Streaming Ingest
From the Sources page, click Add Courier. The Add Source page opens.
Find, and then click the icon for Sreaming Ingest. The Add Courier page opens.
Enter the name of the courier. For example: “Streaming Ingest”.
A courier that pulls data that was streamed to Amperity by the Streaming Ingest API does not require a credential even though the configuration steps will ask you to provide a credential. Create a new credential, name it “<tenant>-streaming-ingest” and give it a description like “Pull streams to Amperity for Streaming Ingest API”.
Under Streaming Ingest Settings, add the Stream ID which is available from the Stream ID column.
Specify the File format, which can be XML, JSON, or NDJSON.
Set the File tag. This must be identical to the file tag within the load operation.
Enter the File pattern prefix, which is useful for time based ingestion of streaming data. This setting may be configured to load data on an hourly basis. Possible values range from
00
-24
, each of which represents an hour in a 24 hour window. For example, use00
to load data at 12:00 AM,08
to load data at 8:00 AM, or12
to load data at 12:00 PM. A courier may only be configured to use a single file pattern prefix.Set the load operations to a string that is obviously incorrect, such as df-xxxxxx. (You may also set the load operation to empty: “{}”.)
Tip
If you use an obviously incorrect string, the load operation settings will be saved in the courier configuration. After the schema for the feed is defined and the feed is activated, you can edit the courier and replace the feed ID with the correct identifier.
Caution
If load operations are not set to “{}” the validation test for the courier configuration settings will fail.
Click Save.
Get sample files¶
Every Streaming Ingest file that is pulled to Amperity must be configured as a feed. Before you can configure each feed you need to know the schema of that file. Run the courier without load operations to bring sample files from Streaming Ingest to Amperity, and then use each of those files to configure a feed.
To get sample files
From the Sources tab, open the menu for a courier configured for Streaming Ingest with empty load operations, and then select Run. The Run Courier dialog box opens.
Select Load data from a specific day, and then select today’s date.
Click Run.
Important
The courier run will fail, but this process will successfully return a list of files from Streaming Ingest.
These files will be available for selection as an existing source from the Add Feed dialog box.
Wait for the notification for this courier run to return an error similar to:
Error running load-operations task Cannot find required feeds: "df-xxxxxx"
Add feeds¶
A feed defines how data should be loaded into a domain table, including specifying which columns are required and which columns should be associated with a semantic tag that indicates that column contains customer profile (PII) and transactions data.
Note
A feed must be added for each file that is pulled from Streaming Ingest, including all files that contain customer records and interaction records, along with any other files that will be used to support downstream workflows.
To add a feed
From the Sources tab, click Add Feed. This opens the Add Feed dialog box.
Under Data Source, select Create new source, and then enter “Streaming Ingest”.
Enter the name of the feed in Feed Name. For example: “WebEvents”.
Tip
The name of the domain table will be “<data-source-name>:<feed-name>”. For example: “Streaming Ingest:WebEvents”.
Under Sample File, select Select existing file, and then choose from the list of files. For example: “filename_YYYY-MM-DD.json”.
Tip
The list of files that is available from this drop-down menu is sorted from newest to oldest.
Select Load sample file on feed activation.
Click Continue. This opens the Feed Editor page.
Select the primary key.
Apply semantic tags to customer records and interaction records, as appropriate.
Under Last updated field, specify which field best describes when records in the table were last updated.
Tip
Choose Generate an “updated” field to have Amperity generate this field. This is the recommended option unless there is a field already in the table that reliably provides this data.
For feeds with customer records (PII data), select Make available to Stitch.
Click Activate. Wait for the feed to finish loading data to the domain table, and then review the sample data for that domain table from the Data Explorer.
Add load operations¶
After the feeds are activated and domain tables are available, add the load operations to the courier used for Streaming Ingest.
Example load operations
Load operations must specify each file that will be pulled to Amperity from Streaming Ingest.
For example:
{
"FEED_ID": [
{
"type": "load",
"file": "file_tag"
}
]
}
To add load operations
From the Sources tab, open the menu for the courier that was configured for Streaming Ingest, and then select Edit. The Edit Courier dialog box opens.
Edit the load operations for each of the feeds that were configured for Streaming Ingest so they have the correct feed ID.
Click Save.
Run courier manually¶
Run the courier again. This time, because the load operations are present and the feeds are configured, the courier will pull data from Streaming Ingest.
To run the courier manually
From the Sources tab, open the menu for the courier with updated load operations that is configured for Streaming Ingest, and then select Run. The Run Courier dialog box opens.
Select the load option, either for a specific time period or all available data. Actual data will be loaded to a domain table because the feed is configured.
Click Run.
This time the notification will return a message similar to:
Completed in 5 minutes 12 seconds
Add to courier group¶
A courier group is a list of one (or more) couriers that are run as a group, either ad hoc or as part of an automated schedule. A courier group can be configured to act as a constraint on downstream workflows.
To add the courier to a courier group
From the Sources tab, click Add Courier Group. This opens the Create Courier Group dialog box.
Enter the name of the courier. For example: “Streaming Ingest”.
Add a cron string to the Schedule field to define a schedule for the orchestration group.
A schedule defines the frequency at which a courier group runs. All couriers in the same courier group run as a unit and all tasks must complete before a downstream process can be started. The schedule is defined using cron.
Cron syntax specifies the fixed time, date, or interval at which cron will run. Each line represents a job, and is defined like this:
┌───────── minute (0 - 59) │ ┌─────────── hour (0 - 23) │ │ ┌───────────── day of the month (1 - 31) │ │ │ ┌────────────── month (1 - 12) │ │ │ │ ┌─────────────── day of the week (0 - 6) (Sunday to Saturday) │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ * * * * * command to execute
For example,
30 8 * * *
represents “run at 8:30 AM every day” and30 8 * * 0
represents “run at 8:30 AM every Sunday”. Amperity validates your cron syntax and shows you the results. You may also use crontab guru to validate cron syntax.Set Status to Enabled.
Specify a time zone.
A courier group schedule is associated with a time zone. The time zone determines the point at which a courier group’s scheduled start time begins. A time zone should be aligned with the time zone of system from which the data is being pulled.
Use the Use this time zone for file date ranges checkbox to use the selected time zone to look for files. If unchecked, the courier group will use the current time in UTC to look for files to pick up.
Note
The time zone that is chosen for an courier group schedule should consider every downstream business processes that requires the data and also the time zone(s) in which the consumers of that data will operate.
Add at least one courier to the courier group. Select the name of the courier from the Courier drop-down. Click + Add Courier to add more couriers.
Click Add a courier group constraint, and then select a courier group from the drop-down list.
A wait time is a constraint placed on a courier group that defines an extended time window for data to be made available at the source location.
Important
A wait time is not required for a bridge.
A courier group typically runs on an automated schedule that expects customer data to be available at the source location within a defined time window. However, in some cases, the customer data may be delayed and isn’t made available within that time window.
For each courier group constraint, apply any offsets.
A courier can be configured to look for files within range of time that is older than the scheduled time. The scheduled time is in Coordinated Universal Time (UTC), unless the “Use this time zone for file date ranges” checkbox is enabled for the courier group.
This range is typically 24 hours, but may be configured for longer ranges. For example, it’s possible for a data file to be generated with a correct file name and datestamp appended to it, but for that datestamp to represent the previous day because of how an upstream workflow is configured. A wait time helps ensure that the data at the source location is recognized correctly by the courier.
Warning
This range of time may affect couriers in a courier group whether or not they run on a schedule. A manually run courier group may not take its schedule into consideration when determining the date range; only the provided input day(s) to load data from are used as inputs.
Click Save.