Difference between revisions of "Feed Handlers"
(78 intermediate revisions by 5 users not shown) | |||
Line 96: | Line 96: | ||
<span style="font-family: courier;">q replay.q –p server_port</span> | <span style="font-family: courier;">q replay.q –p server_port</span> | ||
− | =Configuring AMI to stream | + | =Kafka Feed Handler = |
+ | |||
+ | ==Configuring AMI to stream messages over Kafka== | ||
==Overview== | ==Overview== | ||
1. Configure AMI properties to the Kafka server. | 1. Configure AMI properties to the Kafka server. | ||
− | 2 | + | 2. Restart AMI and start streaming data using Kafka. |
− | |||
− | |||
==Step 1: Configure AMI properties to the Kafka server== | ==Step 1: Configure AMI properties to the Kafka server== | ||
Line 109: | Line 109: | ||
AMI needs to know where and how to deserialize messages sent over Kafka. The following properties need to be updated to achieve that: | AMI needs to know where and how to deserialize messages sent over Kafka. The following properties need to be updated to achieve that: | ||
− | < | + | <div style="font-family: courier;"> |
− | ami.relay.fh.active=ssocket,kafka | + | ami.relay.fh.active=ssocket,kafka <span style="font-family: courier new; color: green;">''#required, must be exact''</span> <br> |
+ | ami.relay.fh.kafka.class=com.f1.ami.relay.fh.kafka.AmiKafkaFH <span style="font-family: courier new; color: green;">''#required, must be exact''</span> <br> | ||
+ | ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaJsonDeserializer <span style="font-family: courier new; color: green;">''#Value deserializer class for JSON | ||
+ | ''</span> <br> | ||
+ | ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer <span style="font-family: courier new; color: green;">''#Value deserializer class for Avro''</span> <br> | ||
+ | ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer <span style="font-family: courier new; color: green;">''#Value deserializer class for Protobuf''</span> <br> | ||
+ | ami.relay.fh.kafka.props.value.deserializer=<fully_qualified_class_name> <span style="font-family: courier new; color: green;">''#Any other custom value deserializer class is supported. The custom class needs to be provided. | ||
+ | ''</span> <br> | ||
+ | ami.relay.fh.kafka.props.helper.factory.class=<fully_qualified_class_name> <span style="font-family: courier new; color: green;">''#set the override Kafka helper factory class to map the custom Kafka helper to your custom deserializer class''</span> <br> | ||
− | |||
− | # insert the hostname of your kafka server here | + | ami.relay.fh.kafka.props.bootstrap.servers=<HOSTNAME>:9092 <span style="font-family: courier new; color: green;">''# insert the hostname of your kafka server here''</span> <br> |
− | ami.relay.fh.kafka.props. | + | ami.relay.fh.kafka.props.topics=<TOPIC_NAME(S)> <span style="font-family: courier new; color: green;">''# insert the comma delimited topic names being used for kafka e.g. topic-1,topic-2,topic-3''</span> <br> |
+ | ami.relay.fh.kafka.props.group.id=<GROUP ID> <span style="font-family: courier new; color: green;"> ''# insert a consumer group id string (in case other processes are consuming from the same topics additionally, use that group id).E.g. test-group''</span> <br> | ||
+ | ami.relay.fh.kafka.props.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer <br> | ||
+ | ami.relay.fh.kafka.props.enable.auto.commit=true <span style="font-family: courier new; color: green;">''# OPTIONAL''</span> <br> | ||
+ | ami.relay.fh.kafka.props.schema.registry=http://<HOSTNAME>:8081 <span style="font-family: courier new; color: green;">''# insert the hostname of your kafka server running schema registry here (OPTIONAL) ''</span> <br> | ||
+ | ami.relay.fh.kafka.props.client.id=<CLIENTID> <span style="font-family: courier new; color: green;">''# kafka id to identify your consumer (OPTIONAL) ''</span> <br> | ||
+ | ami.relay.fh.kafka.props.auto.offset.reset=<OPTION> <span style="font-family: courier new; color: green;">''# What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Options include earliest, latest, none. Where earliest consumes from beginning of the topic partition while latest consumes from end of topic partition which is default. (OPTIONAL)''</span> <br> | ||
− | |||
− | |||
− | ami.relay.fh.kafka.props. | + | ami.relay.fh.kafka.props.sasl.mechanism=<SASL_MECHANISM> <span style="font-family: courier new; color: green;">''# SASL mechanism used for client connections (OPTIONAL - defaults to GSSAPI)''</span> <br> |
− | + | ami.relay.fh.kafka.props.security.protocol=<PROTOCOL> <span style="font-family: courier new; color: green;">''# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. (OPTIONAL - defaults to PLAINTEXT)''</span> <br> | |
− | # | + | ami.relay.fh.kafka.props.sasl.jaas.config=<SASL_JAAS_CONFIG> <span style="font-family: courier new; color: green;">''# JAAS login context parameters for SASL connections in the format used by JAAS configuration files (OPTIONAL - defaults to null)''</span> <br> |
− | ami.relay.fh.kafka.props. | + | ami.relay.fh.kafka.props.sasl.login.callback.handler.class=<CLASSNAME> <span style="font-family: courier new; color: green;">''# Fully-qualified class name for custom sasl login callback handler (OPTIONAL)''</span> <br> |
− | + | ami.relay.fh.kafka.props.use.record.key=<true/false> <span style="font-family: courier new; color: green;">''# AMI uses kafka record keys as AMI message keys by default (same key Kafka records get upserted). Set to false to ignore Kafka record keys and send to AMI without key (OPTIONAL - default true''</span> <br> | |
− | ami.relay.fh.kafka.props. | + | ami.relay.fh.kafka.props.enable.debug.log=<true/false> <span style="font-family: courier new; color: green;">''# Enable debug logging for consumer polls (default false) ''</span> <br> |
− | + | ami.relay.fh.kafka.props.<CUSTOM_PROPERTY>=<CUSTOM_VALUE> <span style="font-family: courier new; color: green;"> ''#Custom properties can also be passed to the Kafka Consumer Client that you want to connect to using the following format: (Refer to Kafka Consumer Configuration documentation on available properties) ''</span> <br> | |
− | # | + | ami.relay.fh.kafka.props.max.poll.records=5000 <span style="font-family: courier new; color: green;"> ''# e.g. max number of records per poll (default 500) '' </span> <br> |
− | ami.relay.fh.kafka.props. | + | </div> |
− | |||
− | # | ||
− | ami.relay.fh.kafka.props. | ||
− | < | ||
− | |||
− | Custom properties can also be passed to the Kafka Client that you want to connect to using the following format: | ||
− | |||
− | < | ||
− | |||
− | |||
− | |||
− | ami.relay.fh.kafka.props. | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | ==Step | + | ==Step 2: Restart AMI and start streaming data using Kafka== |
When AMI starts up, it will automatically connect to the kafka server. | When AMI starts up, it will automatically connect to the kafka server. | ||
Line 154: | Line 148: | ||
1. Create a new window: Windows -> New Window | 1. Create a new window: Windows -> New Window | ||
− | |||
− | |||
2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization. | 2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization. | ||
− | + | 3. Create a Table: Under Realtime Feeds, select the desired feed(s) (with the same topic name as defined in the properties file. Each topic has its separate table) and click Next >>. | |
− | + | [[File:ChooseRealtime.png|800px]] | |
− | |||
− | [[File: | ||
4. Select the desired columns you want to display in the table and select Finish | 4. Select the desired columns you want to display in the table and select Finish | ||
− | [[File: | + | [[File:CreatingRealtime.png|800px]] |
The created Realtime Table: | The created Realtime Table: | ||
[[File:Kafka.05.jpg]] | [[File:Kafka.05.jpg]] | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
The Data Modeler can also be used to create data models for the realtime feeds for more customizability. | The Data Modeler can also be used to create data models for the realtime feeds for more customizability. | ||
Line 501: | Line 333: | ||
<span style="font-family: courier new; color: blue;">ami.relay.fh.active</span>=hazelcast<br> | <span style="font-family: courier new; color: blue;">ami.relay.fh.active</span>=hazelcast<br> | ||
− | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.class</span>=com.f1. | + | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.class</span>=com.f1.AmiHazelcastFHMapPortable<br> |
<span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.url</span>=localhost:5701<br> | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.url</span>=localhost:5701<br> | ||
<span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.maps</span>=hzMap1,hzMap2<br> | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.maps</span>=hzMap1,hzMap2<br> | ||
Line 544: | Line 376: | ||
===Writing a Hazelcast Portable Serializer class for AMI=== | ===Writing a Hazelcast Portable Serializer class for AMI=== | ||
− | Below is an example of a Hazelcast Portable class implementation for deserialization for AMI: | + | Below is an example of a Hazelcast Portable class implementation for deserialization for AMI:<br> |
+ | The feedhandler will extract all non-static primitive data type variables declared in the class provided as its table columns: which are <span style="font-family: courier new; color: red;">sampleInt</span>, <span style="font-family: courier new; color: red;">sampleDouble</span> and <span style="font-family: courier new; color: red;">sampleString</span> in the case of the example below. | ||
<syntaxhighlight lang="java"> | <syntaxhighlight lang="java"> | ||
Line 612: | Line 445: | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
+ | |||
+ | ==Subscribing to Hazelcast Maps with HazelcastJsonValue value types== | ||
+ | |||
+ | ===Properties=== | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.active</span>=hazelcast<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.class</span>=com.f1.AmiHazelcastFHMap<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.url</span>=localhost:5701<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.maps</span>=hzMap1,hzMap2<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.cluster</span>=dev<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#get existing values in Hazelcast map: if true, sets true for all - default false</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.getexistingvalues</span>=false | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#get existing values in Hazelcast map: for individual maps</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap1.getexistingvalues</span>=true<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap2.getexistingvalues</span>=false<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#table name in AMI (optional - map name will be used if not provided)</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap1.tablename</span>=hzTable1<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap2.tablename</span>=hzTable2<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#column mapping for key/value pairs from Hazelcast to AMI e.g. col1=int,col2=double,... (OPTIONAL - feedhandler will automatically convert to string type if not provided)</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap1.mapping</span>=id=int,name=string<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzMap2.mapping</span>=price=double,quantity=long<br> | ||
+ | |||
+ | ====Additional (Optional) Properties==== | ||
+ | <span style="font-family: courier new; color: green;">#Hazelcast credentials - if any</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.username</span>=demo<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.password</span>=demo123<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#SSL default=off</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.sslenabled</span>=true<br> | ||
+ | |||
+ | ==Subscribing to Hazelcast Reliable Topics (valid JSON values only)== | ||
+ | |||
+ | ===Properties=== | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.active</span>=hazelcast<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.class</span>=com.f1.AmiHazelcastFHTopic<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.url</span>=localhost:5701<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.topics</span>=hzTopic1,hzTopic2<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.cluster</span>=dev<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#table name in AMI (optional - topic name will be used if not provided)</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzTopic1.tablename</span>=hzTable1<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzTopic2.tablename</span>=hzTable2<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#column mapping for key/value pairs from topic JSON value to AMI e.g. col1=int,col2=double,... (OPTIONAL - feedhandler will automatically convert to string type if not provided)</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzTopic1.mapping</span>=id=int,name=string<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.hzTopic2.mapping</span>=price=double,quantity=long<br> | ||
+ | |||
+ | ====Additional (Optional) Properties==== | ||
+ | <span style="font-family: courier new; color: green;">#Hazelcast credentials - if any</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.username</span>=demo<br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.password</span>=demo123<br> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#SSL default=off</span><br> | ||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.hazelcast.props.sslenabled</span>=true<br> | ||
+ | |||
+ | =AWS SQS Feed Handler= | ||
+ | == Configuration == | ||
+ | |||
+ | # In order to set up the SQS feedhandler, unzip the tar.gz or zip file and copy the contents of the folder into the '''amione/lib''' directory. | ||
+ | # For generating AWS credentials, two methods are supported - (1) via '''STS''', or (2) via '''Profiles'''. | ||
+ | ## '''STS''': Ensure that both ami.relay.fh.sqs.props.roleArn and ami.relay.fh.sqs.props.roleSessionName properties are set. If either is unset, the feed handler will default to using profiles. Ensure that you have a role with read access to the relevant AWS SQS queue. Note: this method will use your default credentials to assume the provided role (see: https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html for more information on how this is acquired) | ||
+ | ## '''Profiles''': Ensure that you have a valid AWS credential tagged to a profile configured with read access to the relevant AWS SQS queue, see Step 2 of: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-setting-up.html For configuring your profile for the first time, run <span style="font-family: courier new; color: gray;"> aws configure sso </span> and pass in the requested information to generate the required files and credentials for the SDK. Subsequently, running <span style="font-family: courier new; color: gray;">aws sso login –-profile profilename </span> will refresh your credentials. | ||
+ | # Add the respective properties to your local.properties file according to the section below | ||
+ | |||
+ | === Message Format === | ||
+ | This feedhandler accepts only valid JSON as an input source from SQS and expects it to be in a single map where each key represents a column on a table and its value a single column value. | ||
+ | |||
+ | Sample valid message: | ||
+ | <syntaxhighlight> | ||
+ | { | ||
+ | “colA”: “colAValue”, | ||
+ | “colB”: 123, | ||
+ | … | ||
+ | } | ||
+ | </syntaxhighlight> | ||
+ | |||
+ | Note that by default, all column types are inferred to be Strings unless explicitly defined | ||
+ | In the <span style="font-family: courier new; color: blue;"> ami.relay.fh.sqs.props.tableMapping </span> property using a colName=colType,... syntax. | ||
+ | |||
+ | Null values in the JSON message are skipped. | ||
+ | |||
+ | Valid types are as follows: | ||
+ | |||
+ | {| class="wikitable" | ||
+ | ! Underlying Column Type !! Valid Property Name (Case Insensitive) | ||
+ | |- | ||
+ | | String|| str, string | ||
+ | |- | ||
+ | | Integer || int, integer | ||
+ | |- | ||
+ | | Short|| short | ||
+ | |- | ||
+ | | Long || long | ||
+ | |- | ||
+ | | Float || float | ||
+ | |- | ||
+ | | Double || double | ||
+ | |- | ||
+ | | Character || char, character | ||
+ | |- | ||
+ | | Boolean || bool, boolean | ||
+ | |} | ||
+ | |||
+ | == Properties == | ||
+ | <span style="font-family: courier new; color: green;">#Required - use this to configure one or more FHs</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.active=sqs</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - used to start the FH </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.start=true</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - must match exactly </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.class=com.f1.ami.relay.fh.sqs.AmiAwsSqsFH</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - name of AMI table for data to be streamed into </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.tableName=tableName</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - AWS profile name, uses default profile otherwise </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.profileName=AdministratorAccess-123456</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - AWS profile file, uses default AWS location otherwise </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.profileFile=/location/to/file</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - AWS role ARN </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.roleArn=arn | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - AWS role session name </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.roleSessionName=sessionName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - URL of the SQS Queue </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.queueUrl=https://sqs.us-east-1.amazonaws.com/123/queueName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - AWS region of the SQS Queue </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.queueRegion=us-east-1</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - If true, deletes read messages from the queue</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.deleteAfterRead=true</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Specify column name and its underlying types, see Message Format section for more info </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.tableMapping=colA=String,colB=int </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Number of messages to read at a time (defaults to 5) </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.sqs.props.readCount=5</span> | ||
+ | |||
+ | =Solace Feed Handler= | ||
+ | |||
+ | == Message Format == | ||
+ | |||
+ | This feedhandler accepts either JSON or protobuf messages as an input source. For protobuf messages, use the <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.protobufClass</span> property to configure the parser. For JSON messages, it expects it to be in a single map where each key represents a column on a table and its value a single column value. | ||
+ | |||
+ | Sample valid JSON message: | ||
+ | <syntaxhighlight> | ||
+ | { | ||
+ | “colA”: “colAValue”, | ||
+ | “colB”: 123, | ||
+ | … | ||
+ | } | ||
+ | </syntaxhighlight> | ||
+ | |||
+ | Note that by default, all column types are inferred to be Strings unless explicitly defined | ||
+ | In the <span style="font-family: courier new; color: blue;"> ami.relay.fh.solace.props.tableMapping </span> property using a colName=colType,... syntax. | ||
+ | |||
+ | Null values in the JSON message are skipped. | ||
+ | |||
+ | |||
+ | Sample valid protobuf message in a sample.proto file: | ||
+ | |||
+ | <syntaxhighlight> | ||
+ | package protobuf; | ||
+ | option java_package = "com.sample.protobuf"; | ||
+ | message Sample { | ||
+ | required string name = 1; | ||
+ | required int32 id = 2; | ||
+ | } | ||
+ | </syntaxhighlight> | ||
+ | |||
+ | Users can download the protobuf releases from GitHub and run the protoc.exe on the proto file to generate the protobuf parser class. | ||
+ | |||
+ | |||
+ | Valid types are as follows: | ||
+ | |||
+ | {| class="wikitable" | ||
+ | ! Underlying Column Type !! Valid Property Name (Case Insensitive) | ||
+ | |- | ||
+ | | String|| str, string | ||
+ | |- | ||
+ | | Integer || int, integer | ||
+ | |- | ||
+ | | Short|| short | ||
+ | |- | ||
+ | | Long || long | ||
+ | |- | ||
+ | | Float || float | ||
+ | |- | ||
+ | | Double || double | ||
+ | |- | ||
+ | | Character || char, character | ||
+ | |- | ||
+ | | Boolean || bool, boolean | ||
+ | |} | ||
+ | |||
+ | == Properties == | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - use this to configure one or more FHs </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.active=solace </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - used to start the FH </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.start=true </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - must match exactly </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.class=com.f1.ami.relay.fh.solace.AmiSolaceFH </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - name of AMI table for data to be streamed into </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.tableName=solaceTable </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Specify column name and its underlying types, see below for more info </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.tableMapping=colA=String,colB=int </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Specify a solace property file to be used </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.propertyFilepath=/location/to/property </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Semi-optional - Specifies a topic to subscribe to, either this or the queue must be specified, or optionally both can be specified (Uses Direct Message subscription) </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.topic=try-me </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Semi-optional - Specifies a queue to subscribe to, either this or the topic must be specified, or optionally both can be specified (Uses Persistent Message subscription) </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.queue=queueName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specify queue type to be used, valid inputs (non-case sensitive) are: DurableExclusive, DurableNonExclusive, and NonDurableExclusive. Defaults to NonDurableExclusive </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.queueType=DurableExclusive </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specifies how the received message should be parsed, valid inputs (non-case sensitive)are: json, protobuf. Defaults to json </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.parseMode=json </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - required if protobof is used as the parsing mode, specifies the protobuf parser class to be used </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;"> ami.relay.fh.solace.props.protobufParserClass=parser class </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - required if protobof is used as the parsing mode, do not need to use fully qualified name since it searches within protobufParserClass</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;"> ami.relay.fh.solace.props.protobufClass=parser </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specifies the host address of the solace instance </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.host=localhost:55555 </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specifies the vpn name of the solace queue/topic </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.vpnName=default </span> | ||
+ | |||
+ | |||
+ | |||
+ | '''Authentication (at least one mode has to be in use, defaults to basic)''' | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Client Certificate Authentication (Optional) - Required if TLS is in use </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.keystoreUrl=url </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.keystorePassword=password </span> | ||
+ | |||
+ | |||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Kerberos Authentication (Optional) </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.kerberosInstanceName=instanceName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.kerberosJaasContextName=contextName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.kerberosUsernameOnBroker=username </span> | ||
+ | |||
+ | |||
+ | |||
+ | <span style="font-family: courier new; color: green;">#OAuth 2.0 Authentication </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.oauthAccessToken=token </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - defaults to authenticating with no issuer identifier if not specified </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.oauthIssuerIdentifier=identifier </span> | ||
+ | |||
+ | |||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Basic Authentication </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.username=admin </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.password=admin </span> | ||
+ | |||
+ | |||
+ | |||
+ | '''Connection''' | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Use TLS for connection, valid inputs are true/false, defaults to false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.useTLS=false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.tlsTruststorePassword=password </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Ignore expiration of cert, defaults to false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.solace.props.tlsIgnoreExpiration=false </span> | ||
+ | |||
+ | |||
+ | =Tibco EMS Feed Handler= | ||
+ | |||
+ | == Message Format == | ||
+ | |||
+ | This feedhandler only accepts JSON protobuf messages as an input source and expects the message to be in a single map where each key represents a column on a table and its value a single column value. | ||
+ | |||
+ | Sample valid JSON message: | ||
+ | <syntaxhighlight> | ||
+ | { | ||
+ | “colA”: “colAValue”, | ||
+ | “colB”: 123, | ||
+ | … | ||
+ | } | ||
+ | </syntaxhighlight> | ||
+ | |||
+ | Note that by default, all column types are inferred to be Strings unless explicitly defined | ||
+ | In the <span style="font-family: courier new; color: blue;"> ami.relay.fh.tibcoems.props.tableMapping </span> property using a colName=colType,... syntax. | ||
+ | |||
+ | Null values in the JSON message are skipped. | ||
+ | |||
+ | Valid types are as follows: | ||
+ | |||
+ | {| class="wikitable" | ||
+ | ! Underlying Column Type !! Valid Property Name (Case Insensitive) | ||
+ | |- | ||
+ | | String|| str, string | ||
+ | |- | ||
+ | | Integer || int, integer | ||
+ | |- | ||
+ | | Short|| short | ||
+ | |- | ||
+ | | Long || long | ||
+ | |- | ||
+ | | Float || float | ||
+ | |- | ||
+ | | Double || double | ||
+ | |- | ||
+ | | Character || char, character | ||
+ | |- | ||
+ | | Boolean || bool, boolean | ||
+ | |} | ||
+ | |||
+ | == Properties == | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - use this to configure one or more FHs </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.active=tibcoems</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - used to start the FH </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.start=true </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - must match exactly </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.class=com.f1.ami.relay.fh.tibcoems.AmiTibcoEMSFH </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - name of AMI table for data to be streamed into </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.tableName=tableName</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - Specify column name and its underlying types </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.tableMapping=colA=String,colB=int </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Semi-optional - Specifies a topic to subscribe to, either this or the queue must be specified </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.topic=topicName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Semi-optional - Specifies a queue to subscribe to, either this or the topic must be specified </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.queue=queueName </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specifies if the JNDI protocol should be used for the connection, defaults to false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.useJNDI=true/false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - if using the JNDI protocol, should TLS be used, defaults to false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.useTLS=true/false </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - client id to be used </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;"> ami.relay.fh.tibcoems.props.clientID=id </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - specifies the host address of the tibco ems instance </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;"> ami.relay.fh.tibcoems.props.serverUrl=localhost:1234 </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - specifies the username to be used </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.username=username </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Required - specifies the password to be used </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.password=password</span> | ||
+ | |||
+ | <span style="font-family: courier new; color: green;">#Optional - specifies if/how messages should be acknowledged. Valid inputs are: auto,client,dups_ok,explicit_client,explicit_client_dups_ok,no. Defaults to auto </span> | ||
+ | |||
+ | <span style="font-family: courier new; color: blue;">ami.relay.fh.tibcoems.props.ackMode=auto </span> |
Latest revision as of 15:47, 27 February 2024
KDB Feed Handler
Prerequisites
Be sure that KDB is running before adding as a data source. Fully tested versions include 3.x
Configuring KX live streaming inside AMI Relays
KX live streaming integrates with kdb+tick via the AMI Feed handler mechanism. AMI can be configured to connect and subscribe to KX ticker plants. This enables AMI to receive all real-time updates from the ticker plant. Optionally, AMI can also be configured to recover from ticker plant log files before consuming real time updates.
Ami Relay Property Settings for KX feed handler
The following properties should be set in ami relays' config/local.properties
As with all feed handlers, add one uniquely named entry for each KX feed handler to the ami.relay.fh.active property. Be sure to include the default ssocket,cfg and cmd feed handlers. For example if you have only one kx feed handler:
- ami.relay.fh.active,=cfg,cmd,kx1
Then, for each KX feed handler, include the following properties. NOTE: Be sure to include the proper feed handler name in the property name:
- ami.relay.fh.kx1.start=true #must be set to true, otherwise it will be disabled
- ami.relay.fh.kx1.class=com.f1.ami.relay.fh.AmiKxFH #required, must be exact
- ami.relay.fh.kx1.props.kxUrl=hostname:port #location of the ticker plant
- ami.relay.fh.kx1.props.kxUsername=username #optional
- ami.relay.fh.kx1.props.kxPassword=password #optional
- ami.relay.fh.kx1.props.amiId=KX_APP_ID #indicates what the application id of messages coming from this ticker plant will be mapped to (See AMI Backend API Manual for explanation on application ids)
- ami.relay.fh.kx1.props.replayUrl=hostname:port #optional, only if recovery is required. See KDB Ticker plant Recovery steps below on how to configure and start kdb replay process.
- ami.relay.fh.kx1.props.tableKeyMap=kxTableName=AMITableName #No need to declare AMITable upfront, this directive is gonna create it for you and populate the data. If not specified, AMI table will have the same name as kx table
- ami.relay.fh.kx1.props.date.format=yyyy-MM-dd'T'HH:mm:ss The format of the data Column
- ami.relay.fh.kx1.props.date.timezone=<Your_Time_Zone> The time zone that you want your time to appear as
- ami.relay.fh.kx1.props.subscribeQuery=subscription_kx_query #optional
- Default is: .u.sub[`;`]; (.u `i`L;.u.t!{0!meta x} each .u.t)
Example Config:
ami.relay.fh.active=ssocket,kx1
ami.relay.fh.kx1.start=true
ami.relay.fh.kx1.class=com.f1.ami.relay.fh.AmiKxFH
ami.relay.fh.kx1.props.kxUrl=localhost:1235
ami.relay.fh.kx1.props.kxUsername=demo
ami.relay.fh.kx1.props.kxPassword=demo123
ami.relay.fh.kx1.props.amiId=KX_APP_ID
ami.relay.fh.kx1.props.replayUrl=localhost:1234
ami.relay.fh.kx1.props.tableKeyMap=table1=col1,col2,coln|table2=col1, col2
Example Setup
1. Suppose on the kx side, we have a publisher and a subscriber, where inside the publisher we have a kx table called kx
Refer to this link for detailed instructions on how to set up a publisher and a subscriber in kx:
https://code.kx.com/q/kb/publish-subscribe/
2. Launch publisher.q on port 1000 and subscriber.q on port 1001
3. on the Ami side, in the amione/config/local.properties
ami.relay.fh.active=kx1
ami.relay.fh.kx1.start=true
ami.relay.fh.kx1.class=com.f1.ami.relay.fh.AmiKxFH
ami.relay.fh.kx1.props.kxUrl=localhost:1000
ami.relay.fh.kx1.props.amiId=KX_APP_ID2
ami.relay.fh.kx1.props.subscribeQuery=.u.sub[`kx;`];(.u `i`L;.u.t!{0!meta x} each .u.t)
Note here that the kxUrl is the publisher port number and the argument inside u.sub is the kx table name.
Also note that we don't need to create the table in AMI upfront because the feedhandler will create it for you.
KDB Ticker Plant Recovery Steps
In order to support replay you must startup a KDB replay process that will be used by the AMI Relay feed handler to recover data from the ticker plant log file before resuming processing of real time events from the ticker plant.
IMPORTANT: This KDB process must have read access to the ticker plant log file.
Setup process: Create a script called replay.q with the following contents:
upd:{[t;x] (neg first .z.w)(`upd;t;x)}
replay:{if[null first x;:0];-11!x}
Startup Process: (where server_port matches port in ami.relay.fh.kx1.props.replayUrl):
q replay.q –p server_port
Kafka Feed Handler
Configuring AMI to stream messages over Kafka
Overview
1. Configure AMI properties to the Kafka server.
2. Restart AMI and start streaming data using Kafka.
Step 1: Configure AMI properties to the Kafka server
AMI needs to know where and how to deserialize messages sent over Kafka. The following properties need to be updated to achieve that:
ami.relay.fh.active=ssocket,kafka #required, must be exact
ami.relay.fh.kafka.class=com.f1.ami.relay.fh.kafka.AmiKafkaFH #required, must be exact
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaJsonDeserializer #Value deserializer class for JSON
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer #Value deserializer class for Avro
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer #Value deserializer class for Protobuf
ami.relay.fh.kafka.props.value.deserializer=<fully_qualified_class_name> #Any other custom value deserializer class is supported. The custom class needs to be provided.
ami.relay.fh.kafka.props.helper.factory.class=<fully_qualified_class_name> #set the override Kafka helper factory class to map the custom Kafka helper to your custom deserializer class
ami.relay.fh.kafka.props.bootstrap.servers=<HOSTNAME>:9092 # insert the hostname of your kafka server here
ami.relay.fh.kafka.props.topics=<TOPIC_NAME(S)> # insert the comma delimited topic names being used for kafka e.g. topic-1,topic-2,topic-3
ami.relay.fh.kafka.props.group.id=<GROUP ID> # insert a consumer group id string (in case other processes are consuming from the same topics additionally, use that group id).E.g. test-group
ami.relay.fh.kafka.props.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
ami.relay.fh.kafka.props.enable.auto.commit=true # OPTIONAL
ami.relay.fh.kafka.props.schema.registry=http://<HOSTNAME>:8081 # insert the hostname of your kafka server running schema registry here (OPTIONAL)
ami.relay.fh.kafka.props.client.id=<CLIENTID> # kafka id to identify your consumer (OPTIONAL)
ami.relay.fh.kafka.props.auto.offset.reset=<OPTION> # What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Options include earliest, latest, none. Where earliest consumes from beginning of the topic partition while latest consumes from end of topic partition which is default. (OPTIONAL)
ami.relay.fh.kafka.props.sasl.mechanism=<SASL_MECHANISM> # SASL mechanism used for client connections (OPTIONAL - defaults to GSSAPI)
ami.relay.fh.kafka.props.security.protocol=<PROTOCOL> # Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. (OPTIONAL - defaults to PLAINTEXT)
ami.relay.fh.kafka.props.sasl.jaas.config=<SASL_JAAS_CONFIG> # JAAS login context parameters for SASL connections in the format used by JAAS configuration files (OPTIONAL - defaults to null)
ami.relay.fh.kafka.props.sasl.login.callback.handler.class=<CLASSNAME> # Fully-qualified class name for custom sasl login callback handler (OPTIONAL)
ami.relay.fh.kafka.props.use.record.key=<true/false> # AMI uses kafka record keys as AMI message keys by default (same key Kafka records get upserted). Set to false to ignore Kafka record keys and send to AMI without key (OPTIONAL - default true
ami.relay.fh.kafka.props.enable.debug.log=<true/false> # Enable debug logging for consumer polls (default false)
ami.relay.fh.kafka.props.<CUSTOM_PROPERTY>=<CUSTOM_VALUE> #Custom properties can also be passed to the Kafka Consumer Client that you want to connect to using the following format: (Refer to Kafka Consumer Configuration documentation on available properties)
ami.relay.fh.kafka.props.max.poll.records=5000 # e.g. max number of records per poll (default 500)
Step 2: Restart AMI and start streaming data using Kafka
When AMI starts up, it will automatically connect to the kafka server.
To see the data being streamed, a Realtime Table/Visualization needs to be created:
1. Create a new window: Windows -> New Window
2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization.
3. Create a Table: Under Realtime Feeds, select the desired feed(s) (with the same topic name as defined in the properties file. Each topic has its separate table) and click Next >>.
4. Select the desired columns you want to display in the table and select Finish
The created Realtime Table:
The Data Modeler can also be used to create data models for the realtime feeds for more customizability.
AMI Deephaven Feedhandler
Requirements
1. Docker
2. JDK 17
Getting Started with Deephaven
1. Install sample python-example containers from Deephaven from Quick start Install Deephaven
a. curl
b. docker-compose pull
c. docker-compose up
2. If you run into errors with the grpc container, downgrade the containers in the "docker-compose.yml" file to 0.16.1
3. Go to http://localhost:10000/ide/ on the browser
4. Execute the following commands in the deephaven ide
from deephaven import time_table sample = time_table('00:00:02')
This will create a ticking table that will insert a new timestamp record every 2 seconds.
Installing the feedhandler plugin to AMI
1. Place "DeephavenFH.jar" and all other jar files in the "dependencies" directory under "/amione/lib/"
2. Copy the properties from the local.properties file to your own local.properties
3. For JDK17 compatibility, use the attached start.sh file or add the following parameters to the java launch command
--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.text=ALL-UNNAMED --add-opens java.base/sun.net=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/sun.security.action=ALL-UNNAMED
4. Add the following to the java launch parameters to the start.sh file as well
-DConfiguration.rootFile=dh-defaults.prop
5. Launch AMI
Configuring the feedhandler
There are 3 properties that can be configured from the local.properties file:
1. ami.relay.fh.deephaven.props.url - subscription source url
2. ami.relay.fh.deephaven.props.topics- tables to be subscribed to (delimited by commas)
3. ami.relay.fh.deephaven.props.messagesize - (optional) maximum size of messages from the source. Default is 1000000.
For example, to subscribe to a localhost deephaven server with tables Sample1 & Sample2 with a maximum message size of 2000000:
ami.relay.fh.deephaven.props.url=localhost ami.relay.fh.deephaven.props.topics=Sample1,Sample2 ami.relay.fh.deephaven.props.messagesize=2000000
FIX Messages
Use this link to access FIX's documentation.
AMPS Feed Handler
The AMI AMPS Realtime feed handler connects to AMPS at startup and begins an AMPS subscription using the AMPS Command interface. The configuration properties are used to configure the AMPS commands’ command id, topic, option, filter, etc.
Properties
The following properties are set in config/local.properties configuration file. As with all feed handlers, add one uniquely named entry for each AMPS feed handler to the ami.relay.fh.active property. Be sure to include the default ssocket feed handler.
For example, if you have one AMPS feed handler, let's name it amps1:
ami.relay.fh.active=ssocket,amps1
Then, for each AMPS feed handler, include the following properties, where amps1 matches the name in the ami.relay.fh.active property.
Available Properties
ami.relay.fh.amps1.class=com.f1.ami.relay.fh.amps.AmiAmpsFH #required, must be exact
ami.relay.fh.amps1.start=true #default is true, set to false to disable this feed handler
ami.relay.fh.amps1.props.amiId=amps1 #overrides the application id that messages coming from this feed handler will be mapped to. Default is the handler name, which in this case is amps1 (See AMI Backend API Manual for explanation on application ids)
ami.relay.fh.amps1.props.url=tcp://flux.3forge.net:9007/amps/nvfix #AMPS URL. Required
ami.relay.fh.amps1.props.topics= #AMPS topic(s) to subscribe to. Required
ami.relay.fh.amps1.props.filters= #AMPS filters(s). One filter per topic
ami.relay.fh.amps1.props.commands=sow_and_delta_subscribe #AMPS command(s) to send on startup. Default is sow_and_delta_subscribe
ami.relay.fh.amps1.props.options=oof #AMPS option(s) for command, oof (out of focus) is default
ami.relay.fh.amps1.props.timeout=60000 #AMPS timeout for commands, default is 60000
ami.relay.fh.amps1.props.batchsize=10000 #AMPS batchsizes, default is 10000
ami.relay.fh.amps1.props.clientname=AMPS2AMIRELAY #client name when logging into AMPS, default is AMPSS2AMIRELAY
ami.relay.fh.amps1.props.sow_key_mappings=I #What should AMPS sow key be mapped to, default is I, use blank string for no mapping
Minimal Config Example
ami.relay.fh.active=ssocket,amps1 ami.relay.fh.amps1.class=com.f1.ami.relay.fh.amps.AmiAmpsFH ami.relay.fh.amps1.props.url=tcp://localhost:9007/amps/json ami.relay.fh.amps1.props.topics=TOPIC1
Multiple Topics with Filters Subscription Example
ami.relay.fh.active=ssocket,amps1
ami.relay.fh.amps1.class=com.f1.ami.relay.fh.amps.AmiAmpsFH
ami.relay.fh.amps1.props.url=tcp://localhost:9007/amps/json
ami.relay.fh.amps1.props.topics=ORDERS,EXECUTIONS
ami.relay.fh.amps1.props.filters=/status=’open’,/MsgType=’fill’
Logs
Search for com.f1.ami.relay.fh.amps.AmiAmpsFH in the log/AmiOne.log file for information regarding configuration, initialization of the adapter, sending of AMPS commands and any errors.
For Example:
grep com.f1.ami.relay.fh.amps.AmiAmpsFH log/AmiOne.log | ami.relay.fh.amps1.class=com.f1.ami.relay.fh.amps.AmiAmpsFH INF 20190704-09:17:32.802 EST5EDT [main] com.f1.ami.relay.AmiRelayServer::initFH Initializing fh - com.f1.ami.relay.fh.amps.AmiAmpsFH@42a3abe9 INF 20190704-09:17:32.964 EST5EDT [amps connector] com.f1.ami.relay.fh.amps.AmiAmpsFH::run executed command=sow_and_delta_subscribe, topic=FIXMSG, filter=null, options=oof, timeout=60000, batchSize=10000, sowkeymap=I
If you’d like to log each message received from AMPS, add the following to your config/local.properties:
speedlogger.stream.com.f1.ami.relay.fh.amps.AmiAmpsFH=BASIC_APPENDER;FILE_SINK;FINE
And you will start seeing message in the format:
FNE 20190704-09:23:57.909 EST5EDT [AMPS Java Client Background Reader Thread 86] com.f1.ami.relay.fh.amps.AmiAmpsFH::invoke AMPS Message received: Message{ … }
Hazelcast Feedhandler
The Hazelcast Feedhandler consists of 3 types of subscription to Hazelcast:
- 1. Maps with Portable class serialization
- 2. Maps (HazelcastJsonValue type map values only)
- 3. Reliable Topic
Subscribing to Hazelcast Maps with Portable class serialization
Properties
Below is an example snippet of the local.properties setup:
ami.relay.fh.active=hazelcast
ami.relay.fh.hazelcast.class=com.f1.AmiHazelcastFHMapPortable
ami.relay.fh.hazelcast.props.url=localhost:5701
ami.relay.fh.hazelcast.props.maps=hzMap1,hzMap2
ami.relay.fh.hazelcast.props.cluster=dev
#get existing values in Hazelcast map: if true, sets true for all - default false
ami.relay.fh.hazelcast.props.getexistingvalues=false
#get existing values in Hazelcast map: for individual maps
ami.relay.fh.hazelcast.props.hzMap1.getexistingvalues=true
ami.relay.fh.hazelcast.props.hzMap2.getexistingvalues=false
#comma-delimited list of factories and corresponding factory IDs
ami.relay.fh.hazelcast.props.portablefactoryclasses=com.f1.ami.relay.fh.hazelcast.portable.PortableFactory
ami.relay.fh.hazelcast.props.portablefactoryids=333
#table name in AMI (optional - map name will be used if not provided)
ami.relay.fh.hazelcast.props.hzMap1.tablename=hzTable1
#your fully-qualified Hazelcast Portable Serializer class here
ami.relay.fh.hazelcast.props.hzMap1.portableclass=com.f1.ami.relay.fh.hazelcast.portable.PortableSerializer1
#needs to match portable class id
ami.relay.fh.hazelcast.props.hzMap1.portableclassid=1
#needs to match portable factory id
ami.relay.fh.hazelcast.props.hzMap1.portablefactoryid=333
ami.relay.fh.hazelcast.props.hzMap2.tablename=hzTable2
ami.relay.fh.hazelcast.props.hzMap2.portableclass=com.f1.ami.relay.fh.hazelcast.portable.PortableSerializer2
ami.relay.fh.hazelcast.props.hzMap2.portableclassid=2
ami.relay.fh.hazelcast.props.hzMap2.portablefactoryid=333
Additional (Optional) Properties
#Hazelcast credentials - if any
ami.relay.fh.hazelcast.props.username=demo
ami.relay.fh.hazelcast.props.password=demo123
#SSL default=off
ami.relay.fh.hazelcast.props.sslenabled=true
Writing a Hazelcast Portable Serializer class for AMI
Below is an example of a Hazelcast Portable class implementation for deserialization for AMI:
The feedhandler will extract all non-static primitive data type variables declared in the class provided as its table columns: which are sampleInt, sampleDouble and sampleString in the case of the example below.
package com.f1.ami.relay.fh.hazelcast.portable;
import java.io.IOException;
import com.f1.ami.relay.fh.hazelcast.AmiHazelcastPortableIDSetter;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
public class SamplePortable implements Portable, AmiHazelcastPortableIDSetter {
static int ID = 0;
static int FACTORY_ID = 0;
private Integer sampleInt;
private Double sampleDouble;
private String sampleString;
SamplePortable() {
sampleInt = null;
sampleDouble = null;
sampleString = null;
}
SamplePortable(Integer sampleInt, Double sampleDouble, String sampleString) {
this.sampleInt = sampleInt;
this.sampleDouble = sampleDouble;
this.sampleString = sampleString;
}
@Override
public void setPortableClassID(int id) {
SamplePortable.ID = id;
}
@Override
public void setPortableFactoryID(int id) {
SamplePortable.FACTORY_ID = id;
}
@Override
public int getFactoryId() {
return FACTORY_ID;
}
@Override
public int getClassId() {
return ID;
}
@Override
public void writePortable(PortableWriter writer) throws IOException {
writer.writeInt("sampleInt", this.sampleInt);
writer.writeDouble("sampleDouble", this.sampleDouble);
writer.writeString("sampleString", this.sampleString);
}
@Override
public void readPortable(PortableReader reader) throws IOException {
this.sampleInt = reader.readInt("sampleInt");
this.sampleDouble = reader.readDouble("sampleDouble");
this.sampleString = reader.readString("sampleString");
}
}
Subscribing to Hazelcast Maps with HazelcastJsonValue value types
Properties
ami.relay.fh.active=hazelcast
ami.relay.fh.hazelcast.class=com.f1.AmiHazelcastFHMap
ami.relay.fh.hazelcast.props.url=localhost:5701
ami.relay.fh.hazelcast.props.maps=hzMap1,hzMap2
ami.relay.fh.hazelcast.props.cluster=dev
#get existing values in Hazelcast map: if true, sets true for all - default false
ami.relay.fh.hazelcast.props.getexistingvalues=false
#get existing values in Hazelcast map: for individual maps
ami.relay.fh.hazelcast.props.hzMap1.getexistingvalues=true
ami.relay.fh.hazelcast.props.hzMap2.getexistingvalues=false
#table name in AMI (optional - map name will be used if not provided)
ami.relay.fh.hazelcast.props.hzMap1.tablename=hzTable1
ami.relay.fh.hazelcast.props.hzMap2.tablename=hzTable2
#column mapping for key/value pairs from Hazelcast to AMI e.g. col1=int,col2=double,... (OPTIONAL - feedhandler will automatically convert to string type if not provided)
ami.relay.fh.hazelcast.props.hzMap1.mapping=id=int,name=string
ami.relay.fh.hazelcast.props.hzMap2.mapping=price=double,quantity=long
Additional (Optional) Properties
#Hazelcast credentials - if any
ami.relay.fh.hazelcast.props.username=demo
ami.relay.fh.hazelcast.props.password=demo123
#SSL default=off
ami.relay.fh.hazelcast.props.sslenabled=true
Subscribing to Hazelcast Reliable Topics (valid JSON values only)
Properties
ami.relay.fh.active=hazelcast
ami.relay.fh.hazelcast.class=com.f1.AmiHazelcastFHTopic
ami.relay.fh.hazelcast.props.url=localhost:5701
ami.relay.fh.hazelcast.props.topics=hzTopic1,hzTopic2
ami.relay.fh.hazelcast.props.cluster=dev
#table name in AMI (optional - topic name will be used if not provided)
ami.relay.fh.hazelcast.props.hzTopic1.tablename=hzTable1
ami.relay.fh.hazelcast.props.hzTopic2.tablename=hzTable2
#column mapping for key/value pairs from topic JSON value to AMI e.g. col1=int,col2=double,... (OPTIONAL - feedhandler will automatically convert to string type if not provided)
ami.relay.fh.hazelcast.props.hzTopic1.mapping=id=int,name=string
ami.relay.fh.hazelcast.props.hzTopic2.mapping=price=double,quantity=long
Additional (Optional) Properties
#Hazelcast credentials - if any
ami.relay.fh.hazelcast.props.username=demo
ami.relay.fh.hazelcast.props.password=demo123
#SSL default=off
ami.relay.fh.hazelcast.props.sslenabled=true
AWS SQS Feed Handler
Configuration
- In order to set up the SQS feedhandler, unzip the tar.gz or zip file and copy the contents of the folder into the amione/lib directory.
- For generating AWS credentials, two methods are supported - (1) via STS, or (2) via Profiles.
- STS: Ensure that both ami.relay.fh.sqs.props.roleArn and ami.relay.fh.sqs.props.roleSessionName properties are set. If either is unset, the feed handler will default to using profiles. Ensure that you have a role with read access to the relevant AWS SQS queue. Note: this method will use your default credentials to assume the provided role (see: https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html for more information on how this is acquired)
- Profiles: Ensure that you have a valid AWS credential tagged to a profile configured with read access to the relevant AWS SQS queue, see Step 2 of: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-setting-up.html For configuring your profile for the first time, run aws configure sso and pass in the requested information to generate the required files and credentials for the SDK. Subsequently, running aws sso login –-profile profilename will refresh your credentials.
- Add the respective properties to your local.properties file according to the section below
Message Format
This feedhandler accepts only valid JSON as an input source from SQS and expects it to be in a single map where each key represents a column on a table and its value a single column value.
Sample valid message:
{
“colA”: “colAValue”,
“colB”: 123,
…
}
Note that by default, all column types are inferred to be Strings unless explicitly defined In the ami.relay.fh.sqs.props.tableMapping property using a colName=colType,... syntax.
Null values in the JSON message are skipped.
Valid types are as follows:
Underlying Column Type | Valid Property Name (Case Insensitive) |
---|---|
String | str, string |
Integer | int, integer |
Short | short |
Long | long |
Float | float |
Double | double |
Character | char, character |
Boolean | bool, boolean |
Properties
#Required - use this to configure one or more FHs
ami.relay.fh.active=sqs
#Required - used to start the FH
ami.relay.fh.sqs.start=true
#Required - must match exactly
ami.relay.fh.sqs.class=com.f1.ami.relay.fh.sqs.AmiAwsSqsFH
#Required - name of AMI table for data to be streamed into
ami.relay.fh.sqs.props.tableName=tableName
#Optional - AWS profile name, uses default profile otherwise
ami.relay.fh.sqs.props.profileName=AdministratorAccess-123456
#Optional - AWS profile file, uses default AWS location otherwise
ami.relay.fh.sqs.props.profileFile=/location/to/file
#Optional - AWS role ARN
ami.relay.fh.sqs.props.roleArn=arn
#Optional - AWS role session name
ami.relay.fh.sqs.props.roleSessionName=sessionName
#Required - URL of the SQS Queue
ami.relay.fh.sqs.props.queueUrl=https://sqs.us-east-1.amazonaws.com/123/queueName
#Required - AWS region of the SQS Queue
ami.relay.fh.sqs.props.queueRegion=us-east-1
#Optional - If true, deletes read messages from the queue
ami.relay.fh.sqs.props.deleteAfterRead=true
#Optional - Specify column name and its underlying types, see Message Format section for more info
ami.relay.fh.sqs.props.tableMapping=colA=String,colB=int
#Optional - Number of messages to read at a time (defaults to 5)
ami.relay.fh.sqs.props.readCount=5
Solace Feed Handler
Message Format
This feedhandler accepts either JSON or protobuf messages as an input source. For protobuf messages, use the ami.relay.fh.solace.props.protobufClass property to configure the parser. For JSON messages, it expects it to be in a single map where each key represents a column on a table and its value a single column value.
Sample valid JSON message:
{
“colA”: “colAValue”,
“colB”: 123,
…
}
Note that by default, all column types are inferred to be Strings unless explicitly defined In the ami.relay.fh.solace.props.tableMapping property using a colName=colType,... syntax.
Null values in the JSON message are skipped.
Sample valid protobuf message in a sample.proto file:
package protobuf;
option java_package = "com.sample.protobuf";
message Sample {
required string name = 1;
required int32 id = 2;
}
Users can download the protobuf releases from GitHub and run the protoc.exe on the proto file to generate the protobuf parser class.
Valid types are as follows:
Underlying Column Type | Valid Property Name (Case Insensitive) |
---|---|
String | str, string |
Integer | int, integer |
Short | short |
Long | long |
Float | float |
Double | double |
Character | char, character |
Boolean | bool, boolean |
Properties
#Required - use this to configure one or more FHs
ami.relay.fh.active=solace
#Required - used to start the FH
ami.relay.fh.solace.start=true
#Required - must match exactly
ami.relay.fh.solace.class=com.f1.ami.relay.fh.solace.AmiSolaceFH
#Required - name of AMI table for data to be streamed into
ami.relay.fh.solace.props.tableName=solaceTable
#Optional - Specify column name and its underlying types, see below for more info
ami.relay.fh.solace.props.tableMapping=colA=String,colB=int
#Optional - Specify a solace property file to be used
ami.relay.fh.solace.props.propertyFilepath=/location/to/property
#Semi-optional - Specifies a topic to subscribe to, either this or the queue must be specified, or optionally both can be specified (Uses Direct Message subscription)
ami.relay.fh.solace.props.topic=try-me
#Semi-optional - Specifies a queue to subscribe to, either this or the topic must be specified, or optionally both can be specified (Uses Persistent Message subscription)
ami.relay.fh.solace.props.queue=queueName
#Optional - specify queue type to be used, valid inputs (non-case sensitive) are: DurableExclusive, DurableNonExclusive, and NonDurableExclusive. Defaults to NonDurableExclusive
ami.relay.fh.solace.props.queueType=DurableExclusive
#Optional - specifies how the received message should be parsed, valid inputs (non-case sensitive)are: json, protobuf. Defaults to json
ami.relay.fh.solace.props.parseMode=json
#Optional - required if protobof is used as the parsing mode, specifies the protobuf parser class to be used
ami.relay.fh.solace.props.protobufParserClass=parser class
#Optional - required if protobof is used as the parsing mode, do not need to use fully qualified name since it searches within protobufParserClass
ami.relay.fh.solace.props.protobufClass=parser
#Optional - specifies the host address of the solace instance
ami.relay.fh.solace.props.host=localhost:55555
#Optional - specifies the vpn name of the solace queue/topic
ami.relay.fh.solace.props.vpnName=default
Authentication (at least one mode has to be in use, defaults to basic)
#Client Certificate Authentication (Optional) - Required if TLS is in use
ami.relay.fh.solace.props.keystoreUrl=url
ami.relay.fh.solace.props.keystorePassword=password
#Kerberos Authentication (Optional)
ami.relay.fh.solace.props.kerberosInstanceName=instanceName
ami.relay.fh.solace.props.kerberosJaasContextName=contextName
ami.relay.fh.solace.props.kerberosUsernameOnBroker=username
#OAuth 2.0 Authentication
ami.relay.fh.solace.props.oauthAccessToken=token
#Optional - defaults to authenticating with no issuer identifier if not specified
ami.relay.fh.solace.props.oauthIssuerIdentifier=identifier
#Basic Authentication
ami.relay.fh.solace.props.username=admin
ami.relay.fh.solace.props.password=admin
Connection
#Optional - Use TLS for connection, valid inputs are true/false, defaults to false
ami.relay.fh.solace.props.useTLS=false
ami.relay.fh.solace.props.tlsTruststorePassword=password
#Optional - Ignore expiration of cert, defaults to false
ami.relay.fh.solace.props.tlsIgnoreExpiration=false
Tibco EMS Feed Handler
Message Format
This feedhandler only accepts JSON protobuf messages as an input source and expects the message to be in a single map where each key represents a column on a table and its value a single column value.
Sample valid JSON message:
{
“colA”: “colAValue”,
“colB”: 123,
…
}
Note that by default, all column types are inferred to be Strings unless explicitly defined In the ami.relay.fh.tibcoems.props.tableMapping property using a colName=colType,... syntax.
Null values in the JSON message are skipped.
Valid types are as follows:
Underlying Column Type | Valid Property Name (Case Insensitive) |
---|---|
String | str, string |
Integer | int, integer |
Short | short |
Long | long |
Float | float |
Double | double |
Character | char, character |
Boolean | bool, boolean |
Properties
#Required - use this to configure one or more FHs
ami.relay.fh.active=tibcoems
#Required - used to start the FH
ami.relay.fh.tibcoems.start=true
#Required - must match exactly
ami.relay.fh.tibcoems.class=com.f1.ami.relay.fh.tibcoems.AmiTibcoEMSFH
#Required - name of AMI table for data to be streamed into
ami.relay.fh.tibcoems.props.tableName=tableName
#Optional - Specify column name and its underlying types
ami.relay.fh.tibcoems.props.tableMapping=colA=String,colB=int
#Semi-optional - Specifies a topic to subscribe to, either this or the queue must be specified
ami.relay.fh.tibcoems.props.topic=topicName
#Semi-optional - Specifies a queue to subscribe to, either this or the topic must be specified
ami.relay.fh.tibcoems.props.queue=queueName
#Optional - specifies if the JNDI protocol should be used for the connection, defaults to false
ami.relay.fh.tibcoems.props.useJNDI=true/false
#Optional - if using the JNDI protocol, should TLS be used, defaults to false
ami.relay.fh.tibcoems.props.useTLS=true/false
#Required - client id to be used
ami.relay.fh.tibcoems.props.clientID=id
#Required - specifies the host address of the tibco ems instance
ami.relay.fh.tibcoems.props.serverUrl=localhost:1234
#Required - specifies the username to be used
ami.relay.fh.tibcoems.props.username=username
#Required - specifies the password to be used
ami.relay.fh.tibcoems.props.password=password
#Optional - specifies if/how messages should be acknowledged. Valid inputs are: auto,client,dups_ok,explicit_client,explicit_client_dups_ok,no. Defaults to auto
ami.relay.fh.tibcoems.props.ackMode=auto