Autocreate transaction values if missing from JSON payload
This commit is contained in:
28
README.md
28
README.md
@@ -29,7 +29,7 @@ bindings {
|
||||
The test module contains all the integration tests for the module.
|
||||
|
||||
|
||||
### Docker
|
||||
## Docker
|
||||
|
||||
There are two projects available for building the executor and the ledger as docker images.
|
||||
|
||||
@@ -38,3 +38,29 @@ There are two projects available for building the executor and the ledger as doc
|
||||
|
||||
For more information how to run and configure them see their corresponding READMEs.
|
||||
|
||||
|
||||
## Docker Compose
|
||||
|
||||
The project also comes with a *docker-compose.yml* file for running the docker images.
|
||||
|
||||
To run the images first you need to buid them and publish them to your local docker registry.
|
||||
|
||||
You can do that by running:
|
||||
|
||||
1) ``./gradlew dbt-executor:distDocker``
|
||||
2) ``./gradlew dbt-ledger:distDocker``
|
||||
|
||||
After you have generated the Docker images you can run them both by running ``docker-compose up`` in the root folder.
|
||||
|
||||
The ledger will be available at http://localhost:5050/ledger and the executor
|
||||
will be available at http://localhost:5051/executor
|
||||
|
||||
|
||||
## How to make a distributed transaction (in a ratpack application)
|
||||
|
||||
Once you have the executor and the ledger configured and running, you can start making transactions to the database
|
||||
the executor is connected to.
|
||||
|
||||
By default the executor will set up an in-memory database without any tables, so the first transaction you want to make
|
||||
is to create the table you want.
|
||||
|
||||
|
||||
@@ -53,16 +53,24 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
ctx.request.body.then { body ->
|
||||
def mapper = ctx.get(ObjectMapper)
|
||||
BlockTransaction transaction = mapper.readValue(body.text, BlockTransaction)
|
||||
log.info("Recieved transaction $transaction.id")
|
||||
if(!transaction.id) {
|
||||
log.info("Recieved null transaction id, creating new transaction")
|
||||
newTransaction(ctx, transaction)
|
||||
} else {
|
||||
log.info("Recieved transaction $transaction.id, updating it")
|
||||
ledgerService.fetchTransaction(transaction.id).then { Optional<BlockTransaction> t ->
|
||||
t.present ? updateTransaction(ctx, transaction) : newTransaction(ctx, transaction)
|
||||
t.present ? updateTransaction(ctx, t.get(), transaction) : newTransaction(ctx, transaction)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateTransaction(Context ctx, BlockTransaction transaction) {
|
||||
private void updateTransaction(Context ctx, BlockTransaction oldTransaction, BlockTransaction newTransaction) {
|
||||
|
||||
log.info "Transaction $newTransaction.id exists, updating transaction"
|
||||
def transaction = cloneTransaction(oldTransaction, newTransaction)
|
||||
|
||||
def ledgerService = ctx.get(LedgerService)
|
||||
log.info "Transaction $transaction.id exists, updating transaction"
|
||||
ledgerService.updateTransaction(transaction).then {
|
||||
log.info("Transaction $it updated in ledger")
|
||||
if (transaction.completed & !(transaction.executed || transaction.rolledback)) {
|
||||
@@ -74,14 +82,12 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
}
|
||||
}
|
||||
|
||||
private void newTransaction(Context ctx, BlockTransaction transaction) {
|
||||
if(transaction.executed || transaction.rolledback) {
|
||||
log.error("Tried to create a already executed transaction $transaction.id")
|
||||
throw new IllegalArgumentException("Cannot create a transaction with executed or rolledback status")
|
||||
}
|
||||
private void newTransaction(Context ctx, BlockTransaction newTransaction) {
|
||||
|
||||
log.info("Creating new transaction")
|
||||
def transaction = cloneTransaction(new BlockTransaction(), newTransaction)
|
||||
|
||||
def ledgerService = ctx.get(LedgerService)
|
||||
log.info("Creating new transaction")
|
||||
ledgerService.newTransaction(transaction).then {
|
||||
log.info("Transaction $it added to ledger")
|
||||
if(transaction.completed){
|
||||
@@ -92,4 +98,23 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static BlockTransaction cloneTransaction(BlockTransaction oldTransaction, BlockTransaction newTransaction) {
|
||||
def transaction = new BlockTransaction()
|
||||
transaction.id = oldTransaction.id
|
||||
transaction.executed = oldTransaction.executed
|
||||
transaction.completed = oldTransaction.completed
|
||||
transaction.rolledback = oldTransaction.rolledback
|
||||
|
||||
newTransaction.queries.each { q ->
|
||||
def query = transaction.queries.isEmpty() ?
|
||||
new BlockTransaction.Query(oldTransaction, q.query) :
|
||||
new BlockTransaction.Query(transaction.queries.last(), q.query)
|
||||
query.resultError = q.resultError
|
||||
query.result = q.result
|
||||
transaction.queries << query
|
||||
}
|
||||
|
||||
transaction
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,4 +16,4 @@ services:
|
||||
environment:
|
||||
- RATPACK_DBT__EXECUTOR__REMOTE_URL=http://executor:5051/executor
|
||||
ports:
|
||||
- "5050"
|
||||
- "5050:5050"
|
||||
Reference in New Issue
Block a user