domingo, 4 de maio de 2014

MongoDB é uma ferramenta fantástica para usar em análises em tempo real, porque você pode fazer um pré-join de múltiplas fontes de dados, documentos aninhados e em seguida, consultar esses documentos extremamente rápido. Às vezes, os sistemas utilizados para alimentar o MongoDB estão dentro de um banco de dados Oracle. Então, como é que você pode manter os dados MongoDB consistentes com Oracle e sincronizados em tempo quase real? Oracle Streams Advanced Queuing!


Há toneladas de sistemas Open Source  e proprietary queuing que poderiam ser usados​​, então por que escolher Advanced Queuing (AQ a partir de agora)? O suporte a transações. Com AQ, o enfileiramento e de  operações queue acontecem na mesma transação como qualquer outro DML que seu aplicativo Oracle está realizando tornando-se uma maneira muito simples de apenas enfileirar mensagens  para unidades de trabalho comprometidos.


O restante deste artigo percorre um exemplo usando uma tabela muito simples de clientes. Queremos documentos no MongoDB atualizados assim que os dados no Oracle forem comitados. O fluxo de dados será algo parecido com isto:


untitled

 



Antes de entrar em qualquer codificação, vamos primeiro criar a fila que será usado para comunicar operações DML para o mundo exterior (MongoDB). Este é composto por 4 etapas:

  1. Criar um tipo de objeto para a carga da mensagem

  2. Criar uma tabela de queue (fila)

  3. Crie a fila

  4. Inicie e fila


Seria algo como isto :
CREATE TYPE message_t AS OBJECT (json VARCHAR2(4000));

-- Create the queue table. This will persist the messages
-- to disk when the transaction commits.
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'myqueue_tab',
queue_payload_type => 'message_t'
);

-- Create a queue using the new queue table.
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'myqueue',
queue_table => 'myqueue_tab'
);

DBMS_AQADM.START_QUEUE(
queue_name => 'myqueue'
);
END;

Agora que a configuração da fila for feita, vamos começar a escrever algum código para enfileirar mensagens. Faremos isso com triggers na tabela do Oracle serializando o evento DML. Para facilitar as explicações, vamos serializar a operação DML para JSON. Poderíamos facilmente serializar para XML ou algum outro formato, se desejar.
CREATE TABLE customer (
customer_id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR2(40),
address VARCHAR2(40),
city VARCHAR2(40)
);

CREATE TRIGGER customer_queue_trig
AFTER INSERT OR UPDATE OR DELETE ON customer
FOR EACH ROW
DECLARE
json VARCHAR2(4000);
BEGIN
-- For simplicity sake I am just doing simple concatenations. Production
-- JSON serialization code should do character escaping (double quotes,
-- newlines, etc).
IF INSERTING OR UPDATING THEN
json := '{"id":' || :new.customer_id || ',"name":"' || :new.name || '"';
json := json||',"address":"' || :new.address || '","city":"';
json := json|| :new.city || '","dml_type":"';
json := json|| CASE WHEN INSERTING THEN 'I' ELSE 'U' END || '"}';
ELSE
json := '{"id":' || :old.customer_id || ',"dml_type":"D"}';
END IF;

enqueue_message(json);
END;

Agora que temos uma trigger de serialização dos dados, vamos olhar para o procedimento de conveniência enqueue_message que se chama na parte inferior. É uma função wrapper simples em torno DBMS_AQ.ENQUEUE:
CREATE PROCEDURE enqueue_message(payload VARCHAR2) AS
msg message_t := message_t(NULL);
msg_id RAW(16);
priority NUMBER;
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
msg.json := payload;
message_properties.priority := 1; -- give all messages same priority
DBMS_AQ.ENQUEUE(
queue_name => 'myqueue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => msg_id);
END;

O msgid é atribuído pela Oracle AQ e é retornado como uma variável OUT chamando ENQUEUE. Nós realmente não precisamos dele para nada neste exemplo simples. Vamos enfileirar uma mensagem com êxito uma vez que a inserção na tabela do cliente for comitada. Para desenfileirar isso, vamos escrever outro procedimento wrapper simples que pode ser chamado de Ruby (minha linguagem de escolha para isso).
CREATE PROCEDURE dequeue_message(payload OUT VARCHAR2) AS
msg message_t := message_t(NULL);
msg_id RAW(16);
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => msg_id
);
payload := msg.json;
END;

Neste momento só precisamos de chamar este procedimento a partir do nosso código do consumidor de fila e processar a mensagem, fazendo um upserting no documento apropriado no MongoDB. Por padrão, chamando DEQUEUE irá bloquear, esperando para sempre, até que haja uma mensagem na fila. Desde que nós não queiramos que o nosso consumidor de fila tenha que pesquisar as mensagens, este comportamento funciona bem. Você pode mudar isso para nunca ficar esperarando ou esperar com um tempo limite, definindo a dequeue_options.wait. Veja a documentação para mais detalhes. Aqui está um simples consumidor que utiliza o dequeues da Oracle, escreve para Mongo, em seguida, emite um commit na conexão Oracle, que irá remover a mensagem da fila.
#!/usr/bin/env ruby

require 'oci8'
require 'mongo'
require 'yajl'

oc = OCI8.new('queue_user', 'mypass', 'orcl')
col = Mongo::Connection.new.db('customer_db').collection('customer')

cur = oc.parse('BEGIN dequeue_message(:p); END;')
cur.bind_param(':p', nil, String, 4000)

while true
# retrieve message
cur.exec()
json = Yajl::Parser.parse(cur[':p'])

# print
puts json.inspect

if json['dml_type'] == 'D'
col.remove({'_id' => json['id']})
else
# rename customer_id key to _id
json['_id'] = json.delete('id')
json.delete('dml_type')
col.update(
{'_id' => json['_id']},
json,
{:upsert => true}
)
end

# retire do AQ. dequeue não está completo até que isso aconteça oc.commit
end

Para dimensionar horizontalmente, basta executar vários desses consumidores se o desejar. Por agora, vamos correr um consumidor e, em seguida, assistir a mudança na Collection no Mongo após cada COMMIT do Oracle.

Oracle:
INSERT INTO customer VALUES(1, 'Nathan', '123 S. Main', 'Mytown');
COMMIT;

Mongo:
db.customer.find()
{ "_id" : 1, "name" : "Nathan", "address" : "123 S. Main", "city" : "Mytown" }

Oracle:
UPDATE customer SET name = 'Luke' WHERE customer_id = 1;
COMMIT;

Mongo:
db.customer.find()
{ "_id" : 1, "name" : "Luke", "address" : "123 S. Main", "city" : "Mytown" }

Agora vamos reverter uma inserção para verificar se apenas os dados efetivados no Oracle ficam enfileirados e são puxados para dentro Mongo:
INSERT INTO customer VALUES(2, 'John Doe', '123 N. Main', 'Mytown');
ROLLBACK;

Neste caso deveremos ter apenas um documento no Mongo
db.customer.find()
{ "_id" : 1, "name" : "Luke", "address" : "123 S. Main", "city" : "Mytown" }

Ok!!! Agora para encerrar vamos apagar algumas coisas
DELETE FROM customer;
COMMIT;

Mongo:
db.customer.find()
Null

Voilá!!!!

0 comentários:

Postar um comentário

Blog Archive

SmarttNet Solution Provider. Tecnologia do Blogger.

Postagens populares

Twitter MongoDBBrazil

Total de visualizações de página