Skip to main content

Streams

New Name

"Streams" were formerly called "triggers".

Create a stream

PUT/api/v1/datasets/<project>/<dataset_name>/streams
Permissions required: Streams admin, View labels
curl -X PUT 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"stream": {
"comment_filter": {
"user_properties": {
"number:Spend": {
"maximum": 100000,
"minimum": 100
},
"number:Transactions": {
"one_of": [
1
]
},
"string:Country": {
"one_of": [
"uk",
"de"
]
}
}
},
"description": "Used by ACME RPA to create tickets for disputes.",
"model": {
"label_thresholds": [
{
"name": [
"Some Label"
],
"threshold": 0.37
},
{
"name": [
"Another Label"
],
"threshold": 0.46
},
{
"name": [
"Parent Label",
"Child Label"
],
"threshold": 0.41
}
],
"version": 8
},
"name": "dispute",
"title": "Collateral Disputes"
}
}'

Streams enable persistent, stateful iteration through comments in a dataset, with predicted labels and entities computed using a pinned model.

Once a stream is created, the fetch and advance methods can be used to iterate through comments.

NameTypeRequiredDescription
namestringyesAPI name for the stream, used in URLs. Must be unique within a dataset and must match [A-Za-z0-9-_]{1,256}.
titlestringnoOne-line human-readable title for the stream.
descriptionstringnoA longer description of the stream.
modelModelnoIf specified, comments fetched from this stream will contain predictions from a pinned model.
comment_filterCommentFilternoIf specified, comments not matching the filter will not be returned. See here for details on how the comment filter will affect the results returned by the stream.

Where Model has the following format:

NameTypeRequiredDescription
versionintegeryesA model version that has been pinned via the Models page.
label_thresholdsarray<LabelThreshold>noIf set, only values matching the given label_thresholds are returned. If not set, all labels and all prediction values will be returned.

Where LabelThreshold has the following format:

NameTypeRequiredDescription
namearray<string>yesThe name of the label to be returned, formatted as a list of hierarchical labels. For instance, the label "Some Label" will have the format ["Some Label"], and the label "Parent Label > Child Label" will have the format ["Parent Label", "Child Label"].
thresholdnumberyesThe confidence threshold to use for the label (a number between 0.0 and 1.0). The label will only be returned for a comment if its prediction is above this threshold.

Where CommentFilter has the following format:

NameTypeRequiredDescription
user_propertiesUserPropertyFilternoA filter that applies to the user properties of a comment. For more on user properties, see the Comment Reference.

The UserPropertyFilter is a map of user property name to filter. String properties may be filtered to values in a set ({"one_of": ["val_1", "val_2"]}). Number properties may be filtered either to values in a set ({"one_of": [123, 456]}) or to a range ({"minimum": 123, "maximum": 456}).

Update a stream

PUT/api/v1/datasets/<project>/<dataset_name>/streams
Permissions required: Modify streams, View labels
curl -X PUT 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"stream": {
"comment_filter": {
"user_properties": {
"number:Spend": {
"maximum": 100000,
"minimum": 100
},
"number:Transactions": {
"one_of": [
1
]
},
"string:Country": {
"one_of": [
"uk",
"de"
]
}
}
},
"description": "Used by ACME RPA to create tickets for disputes.",
"model": {
"label_thresholds": [
{
"name": [
"Some Label"
],
"threshold": 0.37
},
{
"name": [
"Another Label"
],
"threshold": 0.46
},
{
"name": [
"Parent Label",
"Child Label"
],
"threshold": 0.41
}
],
"version": 8
},
"name": "dispute",
"title": "Collateral Disputes"
}
}'

Get a stream by name

GET/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>
Permissions required: View labels, View streams
curl -X GET 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute' \
-H "Authorization: Bearer $REINFER_TOKEN"

Get all streams

GET/api/v1/datasets/<project>/<dataset_name>/streams
Permissions required: View labels, View streams
curl -X GET 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams' \
-H "Authorization: Bearer $REINFER_TOKEN"

Delete a stream

DELETE/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>
Permissions required: Streams admin, View labels
curl -X DELETE 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute' \
-H "Authorization: Bearer $REINFER_TOKEN"

Fetch comments from a stream

POST/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>/fetch
Permissions required: Consume streams, View labels, View sources
curl -X POST 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute/fetch' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"size": 8
}'

Once a stream is created, it can be queried to fetch comments and their predicted labels and entities. Below are some important aspects to keep in mind when fetching comments from a stream.

Comment Queue

When a stream is created, its initial position is set to be equal to its creation time. If needed, you can set the stream to a different position (either forwards or backwards in time) using the reset endpoint. The stream will return comments starting from its current position. The position of the comment in the comment queue is determined by the order in which the comments were uploaded.

Advancing Your Position in the Queue

Since the stream will always return comments starting from its current position, it should be advanced to the next position after each fetch request by using the advance endpoint. This way the API guarantees at-least-once processing of all comments - if your application fails while processing a batch, it will pick up the same batch on restart. (Note that since an application can successfully process a comment but fail at the advance step, it is important to handle seeing a comment multiple times).

Depending on your application design, you can choose between advancing the stream once for the whole batch (using the batch's sequence_id contained in the response), or advancing it for each individual comment (using the comment's sequence_id contained in the response).

Comment Filter

If a comment_filter was specified when creating the stream, comments not matching the filter will not be included in the results, but will still count towards the requested size, so you may see responses where all of size comments are filtered out, leading to an empty results array. In the example below, we request a batch of 8 comments, all of which are filtered out.

{
"filtered": 8,
"results": [],
"sequence_id": "qs8QcHIBAADJ1p3W2FtmBB3QiOJsCJlR",
"status": "ok"
}

To prevent this from happening, you can set the optional max_filtered parameter, which prevents filtered comments from counting towards the requested size.

Request Format

NameTypeRequiredDescription
sizenumberyesThe number of comments to fetch for this stream. Will return fewer if it reaches end of batch or if comments are filtered out according to the comment filter. Max value is 1024.
max_filterednumbernoConvenience parameter for streams with a comment filter. When provided, up to max_filtered filtered comments will not count towards the requested size. This is useful if you expect a large number of comments to not match the filter. Has no effect on streams without a comment filter. Max value is 1024.

Response Format

The response contains a batch of comments (of at most size comments). If the stream was configured with a pinned model version, the response also contains predicted labels and entities for each comment.

NameTypeDescription
statusstringok if the request is successful, or error in case of an error. See the Overview to learn more about error responses.
filterednumberNumber of comments that were filtered out according to a comment filter. If the stream was created without a filter, this number will always be 0.
sequence_idstringThe batch sequence ID. Used to acknowledge processing of this batch and advance stream to the next batch.
is_end_sequenceboolTrue if there were no additional results in the stream at the time the request was made. False otherwise.
resultsarray<Result>An array containing result objects.

Where Result has the following format:

NameTypeDescription
commentCommentComment data. For a detailed explanation, see the Comment Reference.
sequence_idstringThe comment's sequence ID. Used to acknowledge processing of this comment and advance stream to the next comment.
labelsarray<Label>An array containing predicted labels for this comment, where Label has the format described here.
entitiesarray<Entity>An array containing predicted entities for this comment, where Entity has a format described here.
label_propertiesarray<LabelProperty>An array containing predicted label properties for this comment, where LabelProperty has a format described here.

Advance a stream

POST/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>/advance
Permissions required: Consume streams, View labels
curl -X POST 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute/advance' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"sequence_id": "qs8QcHIBAADJ1p3W2FtmBB3QiOJsCJlR"
}'

Each fetch request returns a sequence_id which represents the position it has fetched up to. Passing that same sequence_id to the advance api will make sure that next time a fetch is performed on the stream it will start from this position. You can advance to the next batch by using the current batch's sequence_id. Alternatively, you can advance to the next comment by using the current comment's sequence_id.

Since an application can successfully process a comment but fail at the advance step, it is important to handle seeing a comment multiple time on the client application side.

NameTypeRequiredDescription
sequence_idstringyesThe sequence ID to advance the stream to.

Reset a stream

POST/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>/reset
Permissions required: Consume streams, View labels
curl -X POST 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute/reset' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"to_comment_created_at": "2020-06-03T16:05:00"
}'

A stream can be reset to move its position backwards or forwards in time, either to repeat previously returned comments or to skip comments. The timestamp used to reset a stream refers to the time the comments were uploaded (i.e. the comment's created_at property, rather than its timestamp property).

NameTypeRequiredDescription
to_comment_created_atstringyesA ISO-8601 timestamp.

The response will contain the sequence_id corresponding to the new stream position.

Tag an exception

PUT/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>/exceptions
Permissions required: Consume streams, View sources
curl -X PUT 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute/exceptions' \
-H "Authorization: Bearer $REINFER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"exceptions": [
{
"metadata": {
"type": "No Prediction"
},
"uid": "18ba5ce699f8da1f.abcdef0123456789"
},
{
"metadata": {
"type": "Wrong Prediction"
},
"uid": "18ba5ce699f8da1f.0123456789abcdef"
}
]
}'

This endpoint allows you to tag comments as exceptions in the platform, so that a model trainer can review and label them in order to improve the model. We recommend to tag the comments for which the model returned no predictions, and comments for which the model returned wrong predictions. (For help with designing the exception handling flow, please check the Integration Guide).

NameTypeRequiredDescription
exceptionsarray<Exception>yesA list of exceptions.

Where Exception has the following format:

NameTypeRequiredDescription
uidstringyesThe uid of the comment that should be tagged as exception.
metadataMetadatayesAn object containing exception metadata.

Where Metadata has the following format:

NameTypeRequiredDescription
typestringyesThe exception type will be available as a filter property in the Re:infer UI. The value can be an arbitrary string. Please choose a short, easy-to-understand string such as "No Prediction" and "Wrong Prediction".

Untag an exception

DELETE/api/v1/datasets/<project>/<dataset_name>/streams/<stream_name>/exceptions?uid=<comment_uid0>[&uid=<comment_uid1>...]
Permissions required: Consume streams, View sources
curl -X DELETE 'https://<my_api_endpoint>/api/v1/datasets/project1/collateral/streams/dispute/exceptions?uid=18ba5ce699f8da1f.abcdef0123456789&uid=18ba5ce699f8da1f.0123456789abcdef' \
-H "Authorization: Bearer $REINFER_TOKEN"

Exceptions can be untagged using the comment UID.