JDBCReceiver is a JMS message producer that queries on a JDBC data source repeatedly and converts the query result into JMS messages, one record for each message. The new JMS messages will be put into the internal XQueue as the output. JDBCReceiver supports all JDBC implentations via DBDriver. Only one operation, select, is supported. In order to select repeatedly, DependendyGroup is required to control if the select operation is skipped or not. Both PauseTime and the Step of the report control the frequency of the operation. It also supports flow control and XA. Fault tolerance with auto reconnections is built in.
Apart from the common properties, there are many implementation specific properties for JDBCReceiver.
Property Name | Data Type | Requirement | Description | Examples |
---|---|---|---|---|
DBDriver | string | mandatory | full classname of JDBC driver | oracle.jdbc.driver.OracleDriver |
SQLQuery | string | mandatory | SQL query statement | SELECT name, status FROM table |
RCField | string | optional | field name to store return code (0 for success) | (default: ReturnCode) |
ResultField | string | optional | field name to store the number of rows returned | (default: SQLResult) |
IDColumn | string | optional | name of the column that is unique as an ID | MESSAGE_ID |
KeyColumn | string | optional | name of the column that monotonic increases | END_TIME |
BodyColumn | string | optional | name of the column for message body | CONTENT |
TimePattern | string | optional | Simple Time P attern to parse the KeyColumn for timestamp | yyyy-MM-dd HH:mm:ss |
ReferenceFile | string | optional | filename to store the state info | /var/log/qbroker/.status/apns.dat |
DependencyGroup | list | optional | list of dependencis to control frequency of operations | see example |
FormatterArgument | list | optional | list of formatters to format selected messages | see example |
Here is an example of JDBCReceiver:
{ "Name": "rcvr_db", "ClassName": "org.qbroker.receiver.JDBCReceiver", "URI": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=devdb1.qbroker.org)(PORT=1530))(ADDRESS=(PROTOCOL=TCP)(HOST=devdb2.qbroker.org)(PORT=1530))(CONNECT_DATA=(failover=true)(server=dedicated)(Service_name=devdb.qbroker.org)))", "Username": "monitor", "Password": "xxxx", "Operation": "select", "SQLQuery": "SELECT message_id, status, apns_delivered_device_count AS "COUNT", to_char(send_started, 'YYYY-MM-DD HH24:MI:SS') AS "SEND_TIME", to_char(apns_send_ended, 'YYYY-MM-DD HH24:MI:SS') AS "APNS_END" FROM notify_owner.message_log where apns_send_ended >= %%TIMESTAMP:D%% ORDER BY apns_send_ended", "LinkName": "root", "Partition": "0,0", "Mode": "daemon", "XAMode": "0", "TextMode": "1", "KeyColumn": "APNS_END", "IDColumn": "MESSAGE_ID", "TimePattern": "yyyy-MM-dd HH:mm:ss", "ReferenceFile": "/var/log/qbroker/.status/apns.dat", "MaxNumberMessage": "200", "Tolerance": "0", "MaxRetry": "2", "DisplayMask": "0", "DependencyGroup": [{ "Dependency": [{ "Name": "skipper", "Type": "ActionSkipper", "Step": "12", "ActiveTime": { "TimeWindow": [{ "Interval": "00:00:00-24:00:00" }] } }] }] }where the receiver monitors a the column of APSN_END for new records. There is one dependency that just wakes up every 12 heartbeat (PauseTime, 5 sec by default). Therefore, the reciever run the query every min (12 x 5 = 60). In the SQLQuery, there is one parameter, %%TIMESTAMP:D%%. It expects the latest timestamp. The receiver will get that value from APNS_END and sets it before each query. If any records with newer timestamps show up, they will be picked up by the receiver. For each new record, there is a new message generated and put to the queue.
Here is an example for FormatterArgument to retrieve PID from message body:
"FormatterArgument": [{ "FieldName": "PID", "Template": ["##body##"], "Substitution": ["s/\n/\r/g", "s/^.+<PublicId>(P\\d+)<.+$/$1/"] }]