Back to the main page
OCI streaming service
- Intro
- Design
- Usage
Intro
The OCI Streaming service is storage solution for ingesting continuous,
high-volume streams of data that you can consume and process in real time.
Streaming can be used for messaging, ingesting high-volume data such as application logs,
and other use cases in which data is produced and processed continually and
sequentially.
Design
In Streaming there is producer and consumer.
- A producer publishes messages to a stream, which is an append-only log.
These messages can be distributed among the partitions (nodes/machines) for scalability.
- A consumer can read messages from streams.
Each message is marked with an offset value, so a consumer can pick up where it left off.
Usage
To create a Stream : Burger menu - Analytics - Streaming
- Create some stream pool first, for ex. admin-stream-pool
- Create stream
ex.
name: zd-stream-log
Messages Endpoint: https://api.cell-1.<oci-region>.streaming.oci.oraclecloud.com
Stream Pool: admin-stream-pool
Number of partitions: 1
Retention: 24h
Read Throughput: 2 MB/s
Write Throughput: 1 MB/s
- To test stream, click "Produce test message", type some message and expect to see "success"
- To see 50 messages in last min, click "Load Messages", and you should see your test.
To emit or put message into stream.
- We can use command in this example, and run it in OCI console shell.
- I am not sure on proper input, so I use this command to see what's JSON file syntax to use.
ZARKO@cloudshell: oci streaming stream message put --generate-param-json-input messages
- The output is :
[
{
"key": "string",
"value": "string"
},
{
"key": "string",
"value": "string"
}
]
- The string is base64 data, hence I'll be using this one for "city,SF,state,CA"
echo city | base64
Y2l0eQo=
echo SF | base64
U0YK
echo state | base64
c3RhdGUK
echo CA | base64
Q0EK
- So my json file (stream-input.json) with stream input data looks:
[
{
"key": "Y2l0eQo=",
"value": "U0YK"
},
{
"key": "c3RhdGUK",
"value": "Q0EK"
}
]
- And my final command to emit into Stream (I know stream-id and endpoint from stream creation action):
ZARKO@cloudshell: oci streaming stream message put --stream-id ocid1.stream.oc1.iad.aaaaaa-shortened-kdxlp4dtkfimh6p6h5aa \
--messages file://stream-input.json --endpoint https://api.cell-1.<oci-region>.streaming.oci.oraclecloud.com
- Recent messages are:
First we need to create a Cursor , in order to read from Stream.
ZARKO@cloudshell: oci streaming stream cursor create-cursor --partition 0 --stream-id ocid1.stream.oc1.iad.aaaa-shortened-kfimh6p6h5aa \
--type TRIM_HORIZON --endpoint https://api.cell-1.<oci-region>.streaming.oci.oraclecloud.com
... output is the cursor ...
{
"data": {
"value": "eyJjdXJzb-shortened-XJzb3JUeXBlIjoicGFydGl0aW9uIn0="
}
}
Now use the cursor to get messages from Stream.
ZARKO@cloudshell: oci streaming stream message get --stream-id ocid1.stream.oc1.iad.aaaa-shortened-p6h5aa \
--endpoint https://api.cell-1.<oci-region>.streaming.oci.oraclecloud.com \
--cursor eyJjdXJzb-shortened-XJzb3JUeXBlIjoicGFydGl0aW9uIn0=
The message reading is like this, so we still need to find way to read this properly.
{
"data": [
{
"key": "Y2l0eQo=",
"offset": 70,
"partition": "0",
"stream": "zd-stream-log",
"timestamp": "2020-03-05T00:44:20.740000+00:00",
"value": "U0YK"
},
{
"key": "countrw=",
"offset": 71,
"partition": "0",
"stream": "zd-stream-log",
"timestamp": "2020-03-05T00:44:20.740000+00:00",
"value": "usY="
},
-shortened-
],
"opc-next-cursor": "eyJjdXJ-shortened-SI6InBhcnRpdGlvbiJ9"
}
This can help to figure some values:
echo U0YK | base64 -d
SF
Back to the main page