diff --git a/dbt-core/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy b/dbt-core/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy index 390cf03..3b600bf 100644 --- a/dbt-core/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy +++ b/dbt-core/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy @@ -35,11 +35,11 @@ class BlockTransaction implements Serializable { } /** - * End the current transaction + * Commit the transaction to database * * @return */ - void end() { + void commit() { completed = true } diff --git a/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy b/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy index de9737f..94ef572 100644 --- a/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy +++ b/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy @@ -5,10 +5,12 @@ import com.devsoap.dbt.data.BlockTransaction import com.fasterxml.jackson.databind.ObjectMapper import groovy.util.logging.Slf4j import ratpack.exec.Promise +import ratpack.func.Pair import ratpack.handling.Context import ratpack.handling.Handler import ratpack.http.Status import ratpack.http.client.HttpClient +import ratpack.http.client.ReceivedResponse import ratpack.jdbctx.Transaction import javax.inject.Inject @@ -41,21 +43,21 @@ class ExecutorHandler implements Handler { return } - executeCommands(ds, transaction).then { - transaction.executed = true - - // Notify ledger of result + executeCommands(ds, transaction).onError { e -> + log.info("Sending rolled back transaction to ledger") + println mapper.writeValueAsString(transaction) + client.post(config.ledger.remoteUrl.toURI(), { spec -> + spec.body.text(mapper.writeValueAsString(transaction)) + }).then { + ctx.error(e) + } + }.then { log.info("Updating ledger with execution result") client.post(config.ledger.remoteUrl.toURI(), { spec -> spec.body.text(mapper.writeValueAsString(transaction)) }).then { - if(it.status != Status.OK) { - log.error("Failed to update ledger with execution result for transaction $transaction.id") - } + ctx.response.send(mapper.writeValueAsString(transaction)) } - - // Return transaction with result - ctx.response.send(mapper.writeValueAsString(transaction)) } } } @@ -81,31 +83,31 @@ class ExecutorHandler implements Handler { def txDs = Transaction.dataSource(ds) def tx = Transaction.create { ds.connection } tx.wrap { - Promise.sync { - try { - transaction.queries.each { block -> - try{ + try { + transaction.queries.each { block -> + try{ + log.info "Executing $block.query ..." + if(block.query.toLowerCase().startsWith("select")){ + log.info('Saving result from Select query') def result = txDs.connection .createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE) .executeQuery(block.query) block.result = toMap(result) - log.info "Executing $block.query ..." - if(block.query.toLowerCase().startsWith("select")){ - log.info('Saving result from Select query') - } else { - txDs.connection.createStatement().execute(block.query) - } - } catch (Exception e) { - block.resultError = e.message - throw e + } else { + txDs.connection.createStatement().execute(block.query) } + } catch (Exception e) { + block.resultError = e.message + throw e } - } catch (Exception e) { - log.error("Failed to execute transaction $transaction.id, transaction rolled back", e) - tx.rollback() - transaction.rolledback = true } - transaction + transaction.executed = true + Promise.sync { transaction } + } catch (Exception e) { + log.error("Failed to execute transaction $transaction.id, transaction rolled back", e) + tx.rollback() + transaction.rolledback = true + Promise.error(e) } } } diff --git a/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy b/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy index 180947f..e38966d 100644 --- a/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy +++ b/dbt-core/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy @@ -5,6 +5,7 @@ import com.devsoap.dbt.data.BlockTransaction import com.devsoap.dbt.services.LedgerService import com.fasterxml.jackson.databind.ObjectMapper import groovy.util.logging.Slf4j +import ratpack.exec.Promise import ratpack.handling.Context import ratpack.handling.Handler import ratpack.http.HttpMethod @@ -24,45 +25,55 @@ class LedgerUpdateTransactionHandler implements Handler { @Override void handle(Context ctx) throws Exception { - ctx.with { - if(ctx.request.method == HttpMethod.POST) { - if(!config.executor.remoteUrl) { - throw new RuntimeException("Executor URL is not set, cannot update transaction") - } + if (ctx.request.method != HttpMethod.POST) { + ctx.next() + return + } - def ledgerService = get(LedgerService) - request.body.then { body -> - def mapper = get(ObjectMapper) - def transaction = mapper.readValue(body.text, BlockTransaction) - log.info("Recieved transaction $transaction.id") - ledgerService.fetchTransaction(transaction.id).then { - if(it.present) { - log.info "Transaction $transaction.id exists, updating transaction" - ledgerService.updateTransaction(transaction).then { - log.info("Transaction $it updated in ledger") - if(transaction.completed && !transaction.executed){ - log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") - redirect(config.executor.remoteUrl) - } else { - render(Jackson.json(transaction)) - } - } - } else { - log.info("Creating new transaction") - ledgerService.newTransaction(transaction).then { - log.info("Transaction $it added to ledger") - if(transaction.completed && !transaction.executed){ - log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") - redirect(config.executor.remoteUrl) - } else { - render(Jackson.json(transaction)) - } - } - } - } - } + if (!config.executor.remoteUrl) { + throw new RuntimeException("Executor URL is not set, cannot update transaction") + } + + def ledgerService = ctx.get(LedgerService) + ctx.request.body.then { body -> + def mapper = ctx.get(ObjectMapper) + BlockTransaction transaction = mapper.readValue(body.text, BlockTransaction) + log.info("Recieved transaction $transaction.id") + ledgerService.fetchTransaction(transaction.id).then { Optional t -> + t.present ? updateTransaction(ctx, transaction) : newTransaction(ctx, transaction) + } + } + } + + private void updateTransaction(Context ctx, BlockTransaction transaction) { + def ledgerService = ctx.get(LedgerService) + log.info "Transaction $transaction.id exists, updating transaction" + ledgerService.updateTransaction(transaction).then { + log.info("Transaction $it updated in ledger") + if (transaction.completed & !(transaction.executed || transaction.rolledback)) { + log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") + ctx.redirect(config.executor.remoteUrl) } else { - next() + ctx.render(Jackson.json(transaction)) + } + } + } + + private void newTransaction(Context ctx, BlockTransaction transaction) { + if(transaction.executed || transaction.rolledback) { + log.error("Tried to create a already executed transaction $transaction.id") + throw new IllegalArgumentException("Cannot create a transaction with executed or rolledback status") + } + + def ledgerService = ctx.get(LedgerService) + log.info("Creating new transaction") + ledgerService.newTransaction(transaction).then { + log.info("Transaction $it added to ledger") + if(transaction.completed){ + log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") + ctx.redirect(config.executor.remoteUrl) + } else { + ctx.render(Jackson.json(transaction)) } } } diff --git a/dbt-core/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy b/dbt-core/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy index 43f1286..b02a532 100644 --- a/dbt-core/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy +++ b/dbt-core/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy @@ -128,7 +128,7 @@ class TransactionManagerService implements Service { } TransactionBuilder complete() { - transaction.end() + transaction.commit() this } diff --git a/dbt-test/build.gradle b/dbt-test/build.gradle index 7ec9ac2..d77dc35 100644 --- a/dbt-test/build.gradle +++ b/dbt-test/build.gradle @@ -1,13 +1,14 @@ apply plugin: 'org.flywaydb.flyway' dependencies { - testCompile project(':dbt-core') + compile project(':dbt-core') - testCompile ratpack.dependency('h2') - testCompile ratpack.dependency('jdbc-tx') + runtime ratpack.dependency('handlebars') + runtime ratpack.dependency('h2') + runtime ratpack.dependency('jdbc-tx') - testCompile 'org.flywaydb:flyway-core:5.0.7' - testRuntime 'org.slf4j:slf4j-simple:1.7.25' + compile 'org.flywaydb:flyway-core:5.0.7' + runtime 'org.slf4j:slf4j-simple:1.7.25' testCompile ratpack.dependency('test') testCompile 'org.spockframework:spock-core:1.0-groovy-2.4' diff --git a/dbt-test/src/ratpack/Ratpack.groovy b/dbt-test/src/ratpack/Ratpack.groovy index cfef01e..619a79d 100644 --- a/dbt-test/src/ratpack/Ratpack.groovy +++ b/dbt-test/src/ratpack/Ratpack.groovy @@ -18,8 +18,6 @@ ratpack { bindings { - module SessionModule - module (DBTModule) { it.ledger.remoteUrl = 'http://localhost:8888/ledger' it.executor.remoteUrl = 'http://localhost:8888/executor' diff --git a/dbt-test/src/test/groovy/com/devsoap/dbt/ExecutorSpec.groovy b/dbt-test/src/test/groovy/com/devsoap/dbt/ExecutorSpec.groovy index 8ddcb30..061e9a8 100644 --- a/dbt-test/src/test/groovy/com/devsoap/dbt/ExecutorSpec.groovy +++ b/dbt-test/src/test/groovy/com/devsoap/dbt/ExecutorSpec.groovy @@ -18,7 +18,7 @@ class ExecutorSpec extends Specification { setup: def transaction = new BlockTransaction() transaction.execute("SELECT * FROM LOGS") - transaction.end() + transaction.commit() when: String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction)) diff --git a/dbt-test/src/test/groovy/com/devsoap/dbt/LedgerSpec.groovy b/dbt-test/src/test/groovy/com/devsoap/dbt/LedgerSpec.groovy index 5380034..89e10a2 100644 --- a/dbt-test/src/test/groovy/com/devsoap/dbt/LedgerSpec.groovy +++ b/dbt-test/src/test/groovy/com/devsoap/dbt/LedgerSpec.groovy @@ -3,15 +3,8 @@ package com.devsoap.dbt import com.devsoap.dbt.data.BlockTransaction import com.fasterxml.jackson.databind.ObjectMapper -import groovy.json.JsonSlurper import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest -import ratpack.impose.Imposition -import ratpack.impose.Impositions -import ratpack.impose.ImpositionsSpec -import ratpack.impose.ServerConfigImposition -import ratpack.server.RatpackServer import spock.lang.AutoCleanup -import spock.lang.Shared import spock.lang.Specification class LedgerSpec extends Specification { @@ -41,7 +34,7 @@ class LedgerSpec extends Specification { setup: def transaction = new BlockTransaction() transaction.execute("SELECT * FROM LOGS") - transaction.end() + transaction.commit() when: String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction)) @@ -56,7 +49,7 @@ class LedgerSpec extends Specification { setup: def transaction = new BlockTransaction() transaction.execute("SELECT * FROM LOGS") - transaction.end() + transaction.commit() when: def response = mapper.readValue(aut.httpClient.requestSpec { spec -> spec.body.text(mapper.writeValueAsString(transaction)) @@ -73,7 +66,7 @@ class LedgerSpec extends Specification { def transaction = new BlockTransaction() transaction.execute("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (1, 'HELLO')") transaction.execute("SELECT * FROM LOGS") - transaction.end() + transaction.commit() when: String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction))