JDBCReceiver

JDBCReceiver is a JMS message producer that queries on a JDBC data source repeatedly and converts the query result into JMS messages, one record for each message. The new JMS messages will be put into the internal XQueue as the output. JDBCReceiver supports all JDBC implentations via DBDriver. Only one operation, select, is supported. In order to select repeatedly, DependendyGroup is required to control if the select operation is skipped or not. Both PauseTime and the Step of the report control the frequency of the operation. It also supports flow control and XA. Fault tolerance with auto reconnections is built in.

Apart from the common properties, there are many implementation specific properties for JDBCReceiver.
Property Name Data Type Requirement Description Examples
DBDriver string mandatory full classname of JDBC driver oracle.jdbc.driver.OracleDriver
SQLQuery string mandatory SQL query statement SELECT name, status FROM table
RCField string optional field name to store return code (0 for success) (default: ReturnCode)
ResultField string optional field name to store the number of rows returned (default: SQLResult)
IDColumn string optional name of the column that is unique as an ID MESSAGE_ID
KeyColumn string optional name of the column that monotonic increases END_TIME
BodyColumn string optional name of the column for message body CONTENT
TimePattern string optional Simple Time P attern to parse the KeyColumn for timestamp yyyy-MM-dd HH:mm:ss
ReferenceFile string optional filename to store the state info /var/log/qbroker/.status/apns.dat
DependencyGroup list optional list of dependencis to control frequency of operations see example
FormatterArgument list optional list of formatters to format selected messages see example
where KeyColumn specifies the name of the column whose value will be used as an monotonic increasing number or timestamp. If KeyColumn is defined, SQLQuery is supposed to contain 1 parameter in the its WHERE clause to select new records. The receiver will set the value before the each query. If ReferenceFile is defined also, the state info will be persisted to the file. Please make sure to define DependencyGroup for repeated select. Otherwise, the reciever will stop after the first query.

Here is an example of JDBCReceiver:

{
  "Name": "rcvr_db",
  "ClassName": "org.qbroker.receiver.JDBCReceiver",
  "URI": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=devdb1.qbroker.org)(PORT=1530))(ADDRESS=(PROTOCOL=TCP)(HOST=devdb2.qbroker.org)(PORT=1530))(CONNECT_DATA=(failover=true)(server=dedicated)(Service_name=devdb.qbroker.org)))",
  "Username": "monitor",
  "Password": "xxxx",
  "Operation": "select",
  "SQLQuery": "SELECT message_id, status, apns_delivered_device_count AS "COUNT", to_char(send_started, 'YYYY-MM-DD HH24:MI:SS') AS "SEND_TIME", to_char(apns_send_ended, 'YYYY-MM-DD HH24:MI:SS') AS "APNS_END" FROM notify_owner.message_log where apns_send_ended >= %%TIMESTAMP:D%% ORDER BY apns_send_ended",
  "LinkName": "root",
  "Partition": "0,0",
  "Mode": "daemon",
  "XAMode": "0",
  "TextMode": "1",
  "KeyColumn": "APNS_END",
  "IDColumn": "MESSAGE_ID",
  "TimePattern": "yyyy-MM-dd HH:mm:ss",
  "ReferenceFile": "/var/log/qbroker/.status/apns.dat",
  "MaxNumberMessage": "200",
  "Tolerance": "0",
  "MaxRetry": "2",
  "DisplayMask": "0",
  "DependencyGroup": [{
    "Dependency": [{
      "Name": "skipper",
      "Type": "ActionSkipper",
      "Step": "12",
      "ActiveTime": {
        "TimeWindow": [{
          "Interval": "00:00:00-24:00:00"
        }]
      }
    }]
  }]
}
where the receiver monitors a the column of APSN_END for new records. There is one dependency that just wakes up every 12 heartbeat (PauseTime, 5 sec by default). Therefore, the reciever run the query every min (12 x 5 = 60). In the SQLQuery, there is one parameter, %%TIMESTAMP:D%%. It expects the latest timestamp. The receiver will get that value from APNS_END and sets it before each query. If any records with newer timestamps show up, they will be picked up by the receiver. For each new record, there is a new message generated and put to the queue.

Here is an example for FormatterArgument to retrieve PID from message body:

  "FormatterArgument": [{
    "FieldName": "PID",
    "Template": ["##body##"],
    "Substitution": ["s/\n/\r/g", "s/^.+<PublicId>(P\\d+)<.+$/$1/"]
  }]