PickupNode listens to an input XQ for JMS messages with an arbitrary URI as a request to retrieve messages. For each incoming message, the URI identifies the data source. PickupNode looks it up in the local cache for any XQueue as the transmit queue established for the data source. If there is an XQueue in the cache for the data source, PickupNode just packs up all the messages in the XQueue into the incoming message body and sends it back as the response. Otherwise, PickupNode will generate a new request with the URI and a newly created XQueue for the data source. The request will be sent to the outlink connected to a ReceiverPool for a new receiver thread. Once the receiver is instantiated, PickupNode will collect the response from the ReceiverPool. The response is supposed to contain the receiver thread for receiving the messages from the specific data source. PickupNode caches the thread and the id of the XQueue with the URI as the key. Therefore, PickupNode is able to receive JMS messages via the receivers from arbitrary data sources based on their URIs and the predefined rulesets on-demand. The received messages will be cached in their XQueues for picking up until they expires.
PickupNode has two types of outlinks, position-fixed and non-fixed. There are three position-fixed outlinks: pool for all requests to the ReceiverPool, failure for the messages failed in the request process, nohit for those messages not covered by any rulesets. The non-fixed outlinks are transmit queues for the on-demand data sources.
PickupNode also contains a number of predefined rulesets. These rulesets categorize messages into non-overlapping groups. Therefore, each rule defines a unique message group. The ruleset also specifies the way to construct URI and properties for the new receivers. For those messages falling off all defined rulesets, PickupNode always creates an extra ruleset, nohit, to handle them. Therefore all the nohit messages will be routed to nohit outlink. The downstream node at nohit is supposed to handle all nohit messages.
URI is used to identify the data source. In order to construct the URI for an arbitrary data source, each ruleset contains two sets of format operations. The first one is the list of templates with the name of URITemplate. The other is the list of substitutions with the name of URISubstitution. URITemplate appends the previous non-empty text to URI and sets the next initial text for its corresponding URISubstitutions to modify. The associations between the URITemplates and URISubstitutions are based on their positions. Either URITemplate or URISubstitution can be null for no action and a place holder. Therefore, you can insert multiple null URITemplates so that the associated URISubstitutions will be able to modify the same text in turns. If any of the operations fails, the message will be routed to failure outlink. Besides, a ruleset may contain a map of DefaultProperty as static or dynamic properties for the new receivers. If the URI string is not in the cache, PickupNode will try to resolve all the dynamic variables in the map of DefaultProperty from the incoming message first. Then it adds the DefaultProperty to the new request for the new receiver. So the extra properties are able to passed over to the receiver pool.
For each new data source, PickupNode creates an Object message as the request containing the URI and the XQueue, as well as the default properties provided they are defined. The request is sent to the ReceiverPool via the pool outlink. Then PickupNode frequently checks the response for each outstanding requests. The response is supposed to have the status and the receiver thread for the new data source. PickupNode will use the thread to monitor its status. If the response does not have the thread, PickupNode will route the messages to the failure outlink and remove the XQueue and the URI from the cache. The samething will happen if the request for a new receiver times out. MaxRetry is used to control when to timeout the request to the pool. It also controls the timeout on a dead receiver thread.
PickupNode also maintains an active set of XQueues as the receiving queues for all data sources. In front of each XQueue, there is at least one receiver thread receiving the messages from the data source. The messages are available for applications to pick up. If PickupNode has not got any pickup requests for certain queues, it will mark them idle, no matter how many messages available. All dynamic queues are monitored frequently in every heartbeat. If one of them has been idle for over MaxIdleTime, the queue will be stopped. Its receiver thread and the queue as well as the leftover messages will also be removed from cache.
You are free to choose any names for the three fixed outlinks. But PickupNode always assumes the first outlink for pool, the second for failure and the third for nohit. The name for nohit is allowed to be overlapped with that of failure. But neither nohit nor failure is allowed to share their names with pool. The rest of the outlinks are for the on-demand data sources.
Apart from the common properties, there are four implementation specific properties for PickupNode.
Property Name | Data Type | Requirement | Description | Examples |
---|---|---|---|---|
Heartbeat | integer | optional | interval in sec to check outstanding requests | 30 (default: 60) |
MaxNumberReceiver | integer | optional | max number of receivers | 32 (default: 256) |
MaxRetry | integer | optional | max number of retries | 2 |
RCField | string | optional | name of the field for return code |
The pickup operation is executed via the pre-defined rulesets. Therefore, the configuration of the rulesets is critical to the operations of PickupNode. Here are complete properties of rulesets for PickupNode.
Property Name | Data Type | Requirement | Description | Examples |
---|---|---|---|---|
Name | alphanumeric with no spaces | mandatory | name of the ruleset | event |
MaxIdleTime | integer | optional | max number of seconds of idle state | 900 |
Capacity | integer | optional | max number of connections | 8 |
MaxRetry | integer | optional | max number of retries | 2 |
ResultTemplate | string | optional | template for result | ##body## |
URITemplate | list | optional | list of templates for URI of new receivers | see example |
URISubstitution | list | optional | list of subsctitutions for URI of new receivers | see example |
DefaultProperty | map | optional | default property map for new receivers | see example |
PreferredOutLink | alphanumeric with no spaces | mandatory for bypass only | name of the preferred outlink | bypass |
FormatterArgument | list | optional | list of post format operations | see example |
JMSPropertyGroup | list | optional | list of pattern groups on properties to select messages | see example |
XJMSPropertyGroup | list | optional | list of pattern groups on properties to exclude messages | see example |
PatternGroup | list | optional | list of pattern groups on body to select messages | see example |
XPatternGroup | list | optional | list of pattern groups on body to exclude messages | see example |
StringProperty | map | optional | for setting the user properties on the messages | see example |
Here is an example of PickupNode:
{ "Name": "node_pickup", "ClassName": "org.qbroker.node.PickupNode", "Description": "test", "Operation": "pickup", "LinkName": "pickup", "Capacity": "32", "MaxNumberReceiver": "512", "Heartbeat": "60", "MaxRetry": "2", "DisplayMask": "0", "XAMode": "1", "Debug": "25", "Ruleset": [{ "Name": "tcp", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "300", "JMSPropertyGroup": [{ "asset": "^[^:]*$", "Port": "^\\d+$" }], "URITemplate": [ "tcp://##asset##:##Port##", "/?Operation=read", "&user=##login##" ], "ResultTemplate": "##body##\n" },{ "Name": "event", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup": [{ "short_name": "^EVENT$", "asset": "^[^:]*$" }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##" ], "DefaultProperty": { "DestinationProperty": { "URI": "log:///var/log/qbroker/QFlow_EVENT.log?Operation=fetch", "TimePattern": "yyyy-MM-dd HH:mm:ss,SSS" } }, "ResultTemplate": "##body##\n" },{ "Name": "win_mon", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup": " "short_name": "^Agent$", "asset": "^[^:]*$", "logdir": "^[cC]:" }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##&Agent=##service##" ], "DefaultProperty": { "DestinationProperty": { "URI": "log:///c:/home/qbroker/logs/MonitorAgent.log?Operation=fetch", "TimePattern": "yyyy-MM-dd HH:mm:ss,SSS" } }, "ResultTemplate": "##body##\n" },{ "Ruleset type="ARRAY"": " "Name": "mon", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup": [{ "short_name": "^Agent$", "asset": "^[^:]*$" }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##&Agent=##service##" ], "DefaultProperty": { "DestinationProperty": { "URI": "log:///var/log/qbroker/MonitorAgent.log?Operation=fetch", "TimePattern": "yyyy-MM-dd HH:mm:ss,SSS" } }, "ResultTemplate": "##body##\n" },{ "Name": "wsh", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup": [{ "short_name": "^WSH$", "asset": "^[^:]*$", "logfile": ".", "pattern": "." }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##&WSH=##service##&name=##name##" ], "DefaultProperty": { "DestinationProperty": { "URI": "log://##logfile##?Operation=fetch", "TimePattern": "##pattern##" } }, "ResultTemplate": "##body##\n" },{ "Name": "admin", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup type="ARRAY"": " "short_name": "^Console$", "asset": "^[^:]*$" }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##&Flow=Console" ], "DefaultProperty": { "DestinationProperty": { "URI": "log:///var/log/qbroker/QFlow_CONSOLE.log?Operation=fetch", "TimePattern": "yyyy-MM-dd HH:mm:ss,SSS" } }, "ResultTemplate": "##body##\n" },{ "Name": "flow", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "150", "JMSPropertyGroup": [{ "short_name": "^Flow$", "asset": "^[^:]*$" }], "URITemplate": [ "tcp://##asset##:6625", "/?Operation=read", "&user=##login##&Flow=##service##" ], "DefaultProperty": { "DestinationProperty": { "URI": "log:///var/log/qbroker/QFlow_##service##.log?Operation=fetch", "TimePattern": "yyyy-MM-dd HH:mm:ss,SSS" } }, "ResultTemplate": "##body##\n" },{ "Name": "wmq_q", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "300", "JMSPropertyGroup": [{ "HostName": "^.+$", "QueueName": "^.+$" }], "URITemplate": [ "wmq://##HostName##", ":##port##", "/?QueueName=##QueueName##", "&Operation=get" ], "URISubstitution": [ "s/^wmq:\/\/$//", "s/^:$//", "s/^QueueName=$//" ], "ResultTemplate": "##body##\n" },{ "Name": "wmq_t", "Capacity": "1024", "XAMode": "1", "MaxIdleTime": "300", "JMSPropertyGroup": [{ "HostName": "^.+$", "TopicName": "^.+$" }], "URITemplate": [ "wmq://##HostName##", ":##port##", "/?TopicName=##TopicName##", "&Operation=sub" ], "URISubstitution": [ "s/^wmq:\/\/$//", "s/^:$//", "s/^TopicName=$//" ], "ResultTemplate": "##body##\n" }], "OutLink": ["ipool", { "Name": "failure", "Capacity": "48", "Partition": "20,4" },{ "Name": "nohit", "Capacity": "32", "Partition": "20,4" }] }