diff --git a/build.gradle b/build.gradle index 3378c60..2ccf620 100644 --- a/build.gradle +++ b/build.gradle @@ -21,6 +21,7 @@ repositories { dependencies { compile ratpack.dependency('h2') compile ratpack.dependency('jdbc-tx') + compile ratpack.dependency('session') compile 'org.flywaydb:flyway-core:5.0.7' runtime 'org.slf4j:slf4j-simple:1.7.25' diff --git a/src/main/groovy/com/devsoap/dbt/BlockTransaction.groovy b/src/main/groovy/com/devsoap/dbt/BlockTransaction.groovy deleted file mode 100644 index 7ca6c4f..0000000 --- a/src/main/groovy/com/devsoap/dbt/BlockTransaction.groovy +++ /dev/null @@ -1,62 +0,0 @@ -package com.devsoap.dbt - -import java.nio.charset.StandardCharsets -import java.security.MessageDigest - -class BlockTransaction implements Serializable { - - String id = UUID.randomUUID().toString() - - List queries = [] - - boolean completed = false - - boolean executed = false - - boolean rolledback = false - - /** - * Executes a query in the transaction - * - * @param query - * the query to execute - * @return - * the result of the query - */ - void execute(String query) { - queries << new Query(queries.empty? null : queries.last(), query) - } - - /** - * End the current transaction - * - * @return - */ - void end() { - completed = true - } - - // A block in the chain - static final class Query implements Serializable { - String data - String hash - long timeStamp - Object result - - Query() { - // For serialization - } - - Query(Query previous, String data) { - this.data = data - timeStamp = new Date().getTime() - hash = generateHash(previous?.hash) - } - - private final String generateHash(String previousHash) { - def digest = MessageDigest.getInstance('SHA-256') - def hash = digest.digest("${previousHash?:''}$timeStamp$data".getBytes(StandardCharsets.UTF_8)) - hash.encodeHex().toString() - } - } -} diff --git a/src/main/groovy/com/devsoap/dbt/DBTModule.groovy b/src/main/groovy/com/devsoap/dbt/DBTModule.groovy new file mode 100644 index 0000000..930ca31 --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/DBTModule.groovy @@ -0,0 +1,62 @@ +package com.devsoap.dbt + +import com.devsoap.dbt.actions.ExecutorChainAction +import com.devsoap.dbt.actions.LedgerChainAction +import com.devsoap.dbt.config.DBTConfig +import com.devsoap.dbt.data.LedgerData +import com.devsoap.dbt.handlers.ExecutorHandler +import com.devsoap.dbt.handlers.LedgerGetTransactionHandler +import com.devsoap.dbt.handlers.LedgerListTransactionsHandler +import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler +import com.devsoap.dbt.services.LedgerService +import com.devsoap.dbt.services.TransactionManagerService +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.inject.multibindings.Multibinder +import groovy.util.logging.Slf4j +import ratpack.guice.ConfigurableModule +import ratpack.handling.HandlerDecorator +import ratpack.server.ServerConfig +import ratpack.session.Session + +@Slf4j +class DBTModule extends ConfigurableModule { + + @Override + protected void configure() { + bind(ObjectMapper) + + bind(LedgerChainAction) + bind(LedgerGetTransactionHandler) + bind(LedgerListTransactionsHandler) + bind(LedgerUpdateTransactionHandler) + + bind(ExecutorChainAction) + bind(ExecutorHandler) + + bind(LedgerData) + + bind(LedgerService) + bind(TransactionManagerService) + + Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding() + .toInstance(HandlerDecorator.prependHandlers(LedgerChainAction)) + + Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding() + .toInstance(HandlerDecorator.prependHandlers(ExecutorChainAction)) + } + + @Override + protected DBTConfig createConfig(ServerConfig serverConfig) { + (DBTConfig) serverConfig.getAsConfigObject('/dbt', DBTConfig)?.getObject() ?: super.createConfig(serverConfig) + } + + @Override + protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) { + if(!config.executor.remoteUrl) { + config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.executor.path}" + } + if(!config.ledger.remoteUrl) { + config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.ledger.path}" + } + } +} \ No newline at end of file diff --git a/src/main/groovy/com/devsoap/dbt/actions/ExecutorChainAction.groovy b/src/main/groovy/com/devsoap/dbt/actions/ExecutorChainAction.groovy new file mode 100644 index 0000000..e1202d8 --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/actions/ExecutorChainAction.groovy @@ -0,0 +1,24 @@ +package com.devsoap.dbt.actions + +import com.devsoap.dbt.config.DBTConfig +import com.devsoap.dbt.handlers.ExecutorHandler +import com.google.inject.Inject +import groovy.util.logging.Slf4j +import ratpack.groovy.handling.GroovyChainAction + +@Slf4j +class ExecutorChainAction extends GroovyChainAction { + + private final String executorPath + + @Inject + ExecutorChainAction(DBTConfig config) { + executorPath = config.executor.path + } + + @Override + void execute() throws Exception { + log.info("Registering executor at $executorPath") + path(executorPath, ExecutorHandler) + } +} diff --git a/src/main/groovy/com/devsoap/dbt/actions/LedgerChainAction.groovy b/src/main/groovy/com/devsoap/dbt/actions/LedgerChainAction.groovy new file mode 100644 index 0000000..d55ff7a --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/actions/LedgerChainAction.groovy @@ -0,0 +1,31 @@ +package com.devsoap.dbt.actions + +import com.devsoap.dbt.config.DBTConfig +import com.devsoap.dbt.handlers.LedgerGetTransactionHandler +import com.devsoap.dbt.handlers.LedgerListTransactionsHandler +import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler +import com.google.inject.Inject +import groovy.util.logging.Slf4j +import ratpack.groovy.handling.GroovyChainAction +import ratpack.handling.Handlers + +@Slf4j +class LedgerChainAction extends GroovyChainAction { + + private final String ledgerPath + + @Inject + LedgerChainAction(DBTConfig config) { + ledgerPath = config.ledger.path + } + + @Override + void execute() throws Exception { + log.info("Registering ledger at $ledgerPath") + path(ledgerPath, Handlers.chain( + registry.get(LedgerGetTransactionHandler), + registry.get(LedgerUpdateTransactionHandler), + registry.get(LedgerListTransactionsHandler) + )) + } +} diff --git a/src/main/groovy/com/devsoap/dbt/config/DBTConfig.groovy b/src/main/groovy/com/devsoap/dbt/config/DBTConfig.groovy index 9be96a8..d1a78be 100644 --- a/src/main/groovy/com/devsoap/dbt/config/DBTConfig.groovy +++ b/src/main/groovy/com/devsoap/dbt/config/DBTConfig.groovy @@ -2,4 +2,5 @@ package com.devsoap.dbt.config class DBTConfig { LedgerConfig ledger = new LedgerConfig() + ExecutorConfig executor = new ExecutorConfig() } diff --git a/src/main/groovy/com/devsoap/dbt/config/ExecutorConfig.groovy b/src/main/groovy/com/devsoap/dbt/config/ExecutorConfig.groovy new file mode 100644 index 0000000..982905d --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/config/ExecutorConfig.groovy @@ -0,0 +1,6 @@ +package com.devsoap.dbt.config + +class ExecutorConfig { + String path = 'executor' + String remoteUrl +} diff --git a/src/main/groovy/com/devsoap/dbt/config/LedgerConfig.groovy b/src/main/groovy/com/devsoap/dbt/config/LedgerConfig.groovy index 28c8be0..ba2d5bb 100644 --- a/src/main/groovy/com/devsoap/dbt/config/LedgerConfig.groovy +++ b/src/main/groovy/com/devsoap/dbt/config/LedgerConfig.groovy @@ -1,5 +1,6 @@ package com.devsoap.dbt.config class LedgerConfig { - String url = 'http://localhost:5050/ledger' + String path = 'ledger' + String remoteUrl } diff --git a/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy b/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy new file mode 100644 index 0000000..fb4bf5f --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy @@ -0,0 +1,82 @@ +package com.devsoap.dbt.data + +import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.databind.JsonNode +import groovy.transform.ToString + +import java.nio.charset.StandardCharsets +import java.security.MessageDigest + +@ToString +class BlockTransaction implements Serializable { + + String id = UUID.randomUUID().toString().replace('-','') + + List queries = [] + + boolean completed = false + + boolean executed = false + + boolean rolledback = false + + /** + * Executes a query in the transaction + * + * @param query + * the query to execute + * @return + * the result of the query + */ + void execute(String query) { + if(queries.isEmpty()){ + queries << new Query(this, query) + } else { + queries << new Query(queries.last(), query) + } + } + + /** + * End the current transaction + * + * @return + */ + void end() { + completed = true + } + + // A block in the chain + @ToString + static final class Query implements Serializable { + String query + String id + String parent + long timeStamp + + Map result + + Query() { + // For serialization + } + + Query(Query previous, String query) { + this.query = query + timeStamp = new Date().getTime() + parent = previous.id + id = generateHash() + } + + Query(BlockTransaction transaction, String query) { + this.query = query + timeStamp = new Date().getTime() + parent = transaction.id + id = generateHash() + } + + final String generateHash() { + def digest = MessageDigest.getInstance('SHA-256') + def hash = digest.digest("${parent?:''}$timeStamp$query".getBytes(StandardCharsets.UTF_8)) + hash.encodeHex().toString() + } + } +} diff --git a/src/main/groovy/com/devsoap/dbt/data/LedgerData.groovy b/src/main/groovy/com/devsoap/dbt/data/LedgerData.groovy new file mode 100644 index 0000000..8056eee --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/data/LedgerData.groovy @@ -0,0 +1,6 @@ +package com.devsoap.dbt.data + +class LedgerData implements Serializable { + + List transactions = new ArrayList<>() +} diff --git a/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy b/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy index 33f6a90..0ddea49 100644 --- a/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy +++ b/src/main/groovy/com/devsoap/dbt/handlers/ExecutorHandler.groovy @@ -1,10 +1,10 @@ package com.devsoap.dbt.handlers -import com.devsoap.dbt.BlockTransaction +import com.devsoap.dbt.data.BlockTransaction import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ArrayNode -import groovy.util.logging.Log +import groovy.util.logging.Slf4j import ratpack.exec.Promise import ratpack.handling.Context import ratpack.handling.Handler @@ -14,11 +14,9 @@ import ratpack.jdbctx.Transaction import javax.sql.DataSource import java.sql.ResultSet -@Log +@Slf4j class ExecutorHandler implements Handler { - static final String PATH = 'executor' - @Override void handle(Context ctx) throws Exception { ctx.request.body.then { body -> @@ -27,7 +25,7 @@ class ExecutorHandler implements Handler { def transaction = mapper.readValue(body.text, BlockTransaction) if(!validateChain(transaction)) { - ctx.response.status = Status.of(400, 'Transaction chain invalid') + ctx.response.status(Status.of(400, 'Transaction chain invalid')) return } @@ -38,25 +36,38 @@ class ExecutorHandler implements Handler { } } - boolean validateChain(BlockTransaction transaction) { - //FIXME + private static boolean validateChain(BlockTransaction transaction) { + if(transaction.queries[0].parent != transaction.id) { + return false + } + for(int i=1; i executeCommands(DataSource ds, ObjectMapper mapper, BlockTransaction transaction) { + private static Promise executeCommands(DataSource ds, ObjectMapper mapper, BlockTransaction transaction) { def txDs = Transaction.dataSource(ds) def tx = Transaction.create { ds.connection } tx.wrap { Promise.sync { transaction.queries.each { block -> - log.info "Executing $block.data ..." - if(block.data.toLowerCase().startsWith("select")){ + log.info "Executing $block.query ..." + if(block.query.toLowerCase().startsWith("select")){ + log.info('Saving result from Select query') def result = txDs.connection .createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE) - .executeQuery(block.data) - block.result = toJson(mapper, result) + .executeQuery(block.query) + block.result = toMap(result) } else { - txDs.connection.createStatement().execute(block.data) + txDs.connection.createStatement().execute(block.query) } } transaction @@ -64,30 +75,26 @@ class ExecutorHandler implements Handler { } } - private static JsonNode toJson(ObjectMapper mapper, ResultSet resultSet) { - def json = mapper.createObjectNode() + private static Map toMap(ResultSet resultSet) { + def map = [:] if(resultSet.last()) { - int rows = resultSet.row - log.info("Converting $rows rows to json") resultSet.beforeFirst() - resultSet.metaData.columnCount.times { column -> def columnIndex = column + 1 def columnName = resultSet.metaData.getColumnName(columnIndex) - ArrayNode columnValue = json.get(columnName) - if(!columnValue) { - columnValue = mapper.createArrayNode() - json.set(columnName, columnValue) + + def columnValues = map[columnName] as List + if(columnValues == null) { + map[columnName] = columnValues = [] } resultSet.beforeFirst() while(resultSet.next()) { - columnValue.addPOJO(resultSet.getObject(columnIndex)) + columnValues << resultSet.getObject(columnIndex) } } } - //mapper.writeValueAsString(json) - mapper.valueToTree(json) + map } } diff --git a/src/main/groovy/com/devsoap/dbt/handlers/LedgerGetTransactionHandler.groovy b/src/main/groovy/com/devsoap/dbt/handlers/LedgerGetTransactionHandler.groovy new file mode 100644 index 0000000..9ae559e --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/handlers/LedgerGetTransactionHandler.groovy @@ -0,0 +1,34 @@ +package com.devsoap.dbt.handlers + +import com.devsoap.dbt.services.LedgerService +import groovy.util.logging.Log +import groovy.util.logging.Slf4j +import ratpack.handling.Context +import ratpack.handling.Handler +import ratpack.http.HttpMethod +import ratpack.jackson.Jackson +import ratpack.session.Session + +@Slf4j +class LedgerGetTransactionHandler implements Handler { + + @Override + void handle(Context ctx) throws Exception { + ctx.with { + if(request.method == HttpMethod.GET && header('X-Transaction-Id').present) { + def id = request.headers['X-Transaction-Id'].toString() + def ledgerService = ctx.get(LedgerService) + def session = ctx.get(Session) + ledgerService.fetchTransaction(session, id).then { + if(it.present) { + render(Jackson.json(it.get())) + } else { + notFound() + } + } + } else { + next() + } + } + } +} diff --git a/src/main/groovy/com/devsoap/dbt/handlers/LedgerHandler.groovy b/src/main/groovy/com/devsoap/dbt/handlers/LedgerHandler.groovy deleted file mode 100644 index 53e6064..0000000 --- a/src/main/groovy/com/devsoap/dbt/handlers/LedgerHandler.groovy +++ /dev/null @@ -1,51 +0,0 @@ -package com.devsoap.dbt.handlers - -import com.devsoap.dbt.BlockTransaction -import com.devsoap.dbt.services.LedgerService -import com.fasterxml.jackson.databind.ObjectMapper -import groovy.util.logging.Log -import ratpack.handling.Context -import ratpack.handling.Handler - -@Log -class LedgerHandler implements Handler { - - static final String PATH = 'ledger' - - @Override - void handle(Context ctx) { - def ledgerService = ctx.get(LedgerService) - - ctx.byMethod { - delegate = it - - get({ - def transaction = ledgerService.fetchTransaction(ctx.request.headers['X-Transaction-Id'].toString()) - def mapper = ctx.get(ObjectMapper) - ctx.response.send(mapper.writeValueAsString(transaction)) - } as Handler) - - post({ - ctx.request.body.then { body -> - def mapper = ctx.get(ObjectMapper) - def transaction = mapper.readValue(body.text, BlockTransaction) - - def existingTransaction = ledgerService.fetchTransaction(transaction.id) - if(existingTransaction) { - ledgerService.updateTransaction(transaction) - } else { - log.info("Creating new transaction") - ledgerService.newTransaction(transaction) - } - if(transaction.completed){ - log.info("Sending transaction $transaction.id to executor") - ctx.redirect(ExecutorHandler.PATH) - } else { - ctx.response.send(mapper.writeValueAsString(transaction)) - } - } - } as Handler) - } - } - -} diff --git a/src/main/groovy/com/devsoap/dbt/handlers/LedgerListTransactionsHandler.groovy b/src/main/groovy/com/devsoap/dbt/handlers/LedgerListTransactionsHandler.groovy new file mode 100644 index 0000000..de233df --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/handlers/LedgerListTransactionsHandler.groovy @@ -0,0 +1,29 @@ +package com.devsoap.dbt.handlers + +import com.devsoap.dbt.services.LedgerService +import groovy.util.logging.Slf4j +import ratpack.handling.Context +import ratpack.handling.Handler +import ratpack.http.HttpMethod +import ratpack.jackson.Jackson +import ratpack.session.Session + +@Slf4j +class LedgerListTransactionsHandler implements Handler { + + @Override + void handle(Context ctx) throws Exception { + ctx.with { + if(request.method == HttpMethod.GET && !header('X-Transaction-Id').present) { + def session = get(Session) + log.info("Listing transactions in session $session.id") + def ledgerService = get(LedgerService) + ledgerService.allTransactions(session).then { + render(Jackson.json(it)) + } + } else { + next() + } + } + } +} diff --git a/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy b/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy new file mode 100644 index 0000000..b54e5ec --- /dev/null +++ b/src/main/groovy/com/devsoap/dbt/handlers/LedgerUpdateTransactionHandler.groovy @@ -0,0 +1,75 @@ +package com.devsoap.dbt.handlers + +import com.devsoap.dbt.config.DBTConfig +import com.devsoap.dbt.data.BlockTransaction +import com.devsoap.dbt.data.LedgerData +import com.devsoap.dbt.services.LedgerService +import com.fasterxml.jackson.databind.ObjectMapper +import groovy.util.logging.Log +import groovy.util.logging.Slf4j +import ratpack.config.ConfigData +import ratpack.handling.Context +import ratpack.handling.Handler +import ratpack.http.HttpMethod +import ratpack.jackson.Jackson +import ratpack.server.ServerConfig +import ratpack.session.Session + +import javax.inject.Inject + +@Slf4j +class LedgerUpdateTransactionHandler implements Handler { + + private final String executorUrl + + @Inject + LedgerUpdateTransactionHandler(DBTConfig config) { + executorUrl = config.executor.remoteUrl + } + + @Override + void handle(Context ctx) throws Exception { + ctx.with { + if(ctx.request.method == HttpMethod.POST) { + if(!executorUrl) { + throw new RuntimeException("Executor URL is not set, cannot update transaction") + } + + def ledgerService = get(LedgerService) + def session = get(Session) + request.body.then { body -> + def mapper = get(ObjectMapper) + def transaction = mapper.readValue(body.text, BlockTransaction) + log.info("Recieved transaction $transaction.id") + ledgerService.fetchTransaction(session,transaction.id).then { + if(it.present) { + log.info "Transaction $transaction.id exists, updating transaction" + ledgerService.updateTransaction(session, transaction).then { + log.info("Transaction $it updated in ledger") + if(transaction.completed){ + log.info("Sending transaction $transaction.id to executor at $executorUrl") + redirect(executorUrl) + } else { + render(Jackson.json(transaction)) + } + } + } else { + log.info("Creating new transaction") + ledgerService.newTransaction(session, transaction).then { + log.info("Transaction $it added to ledger") + if(transaction.completed){ + log.info("Sending transaction $transaction.id to executor at $executorUrl") + redirect(executorUrl) + } else { + render(Jackson.json(transaction)) + } + } + } + } + } + } else { + next() + } + } + } +} diff --git a/src/main/groovy/com/devsoap/dbt/services/LedgerService.groovy b/src/main/groovy/com/devsoap/dbt/services/LedgerService.groovy index 1114d95..cfc7bf3 100644 --- a/src/main/groovy/com/devsoap/dbt/services/LedgerService.groovy +++ b/src/main/groovy/com/devsoap/dbt/services/LedgerService.groovy @@ -1,33 +1,35 @@ package com.devsoap.dbt.services -import com.devsoap.dbt.BlockTransaction -import groovy.util.logging.Log +import com.devsoap.dbt.data.BlockTransaction +import com.devsoap.dbt.data.LedgerData +import groovy.util.logging.Slf4j +import ratpack.exec.Promise import ratpack.service.Service -import ratpack.service.StartEvent -import ratpack.service.StopEvent +import ratpack.session.Session -@Log +@Slf4j class LedgerService implements Service { - static final transient List transactions = [] + private static final LedgerData data = new LedgerData() - BlockTransaction fetchTransaction(String transactionId) { - log.info("Fetching transaction $transactionId") - log.info("Transactions:$transactions") - transactions.find {it.id == transactionId} + Promise> fetchTransaction(Session session, String transactionId) { + Promise.value(Optional.ofNullable(data.transactions.find {it.id == transactionId})) } - String newTransaction(BlockTransaction transaction) { - log.info("Adding new transaction $transaction.id") - transactions << transaction - transaction.id + Promise> allTransactions(Session session) { + Promise.value(data.transactions) } - String updateTransaction(BlockTransaction transaction) { - log.info("Updating transaction $transaction.id") - def existingTransaction = fetchTransaction(transaction.id) - def index = transactions.indexOf(existingTransaction) - transactions.remove(index) - transactions.add(index, transaction) + Promise newTransaction(Session session, BlockTransaction transaction) { + log.info("Adding new transaction $transaction.id to session ${session.id}") + data.transactions.add(transaction) + Promise.value(transaction.id) + } + + Promise updateTransaction(Session session, BlockTransaction transaction) { + log.info("Updating transaction $transaction.id in session ${session.id}") + data.transactions.removeAll {it.id == transaction.id} + data.transactions.add(transaction) + Promise.value(transaction.id) } } diff --git a/src/main/groovy/com/devsoap/dbt/DBTManager.groovy b/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy similarity index 50% rename from src/main/groovy/com/devsoap/dbt/DBTManager.groovy rename to src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy index 954c859..03c0d63 100644 --- a/src/main/groovy/com/devsoap/dbt/DBTManager.groovy +++ b/src/main/groovy/com/devsoap/dbt/services/TransactionManagerService.groovy @@ -1,50 +1,63 @@ -package com.devsoap.dbt +package com.devsoap.dbt.services import com.devsoap.dbt.config.DBTConfig +import com.devsoap.dbt.data.BlockTransaction import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper -import groovy.util.logging.Log +import groovy.util.logging.Slf4j import ratpack.exec.Promise +import ratpack.http.Status import ratpack.http.client.HttpClient import ratpack.service.Service import javax.inject.Inject -@Log -class DBTManager implements Service { - - private final String ledgerUrl +@Slf4j +class TransactionManagerService implements Service { private final HttpClient httpClient private final ObjectMapper mapper + private final DBTConfig config @Inject - DBTManager(DBTConfig config, HttpClient httpClient, ObjectMapper mapper){ - ledgerUrl = config.ledger.url + TransactionManagerService(DBTConfig config, HttpClient httpClient, ObjectMapper mapper){ + this.config = config this.httpClient = httpClient this.mapper = mapper } Promise execute(ExecuteQuery queryBuilder) { - log.info("Executing new transaction") - def builder = new TransactionBuilder(this) + if(!config.ledger.remoteUrl) { + throw new RuntimeException("Ledger remote url is not set, cannot execute query") + } + + def builder = new TransactionBuilder() queryBuilder.build(builder) def transaction = builder.build() - log.info("Sending transaction $transaction.id to ledger") - httpClient.post(ledgerUrl.toURI(), { spec -> + log.info("Sending transaction $transaction.id to ledger at $config.ledger.remoteUrl") + httpClient.post(config.ledger.remoteUrl.toURI(), { spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }).flatMap { response -> - Promise.value(mapper.readTree(response.body.text)) + }).onError { + log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl") + }.map { response -> + if(response.status == Status.OK) { + mapper.readTree(response.body.text) + } } } Promise execute(String transactionId, ExecuteQuery queryBuilder) { - log.info("Amending existing transaction $transactionId") + if(!config.ledger.remoteUrl) { + throw new RuntimeException("Ledger remote url is not set, cannot execute query") + } - log.info("Getting transaction $transactionId from ledger") - httpClient.get(ledgerUrl.toURI(), { spec -> + log.info("Sending transaction $transactionId to ledger at $config.ledger.remoteUrl") + httpClient.get(config.ledger.remoteUrl.toURI(), { spec -> spec.headers.add('X-Transaction-Id', transactionId) }).flatMap { response -> + if(response.status != Status.OK) { + throw new RuntimeException("Server returned ${response.statusCode} ${response.status.message}") + } def oldTransaction = mapper.readValue(response.body.text, BlockTransaction) if(oldTransaction == null) { throw new RuntimeException("Transaction with id $transactionId could not be found") @@ -53,7 +66,8 @@ class DBTManager implements Service { throw new RuntimeException("Cannot modify a completed transaction") } - def builder = new TransactionBuilder(this, oldTransaction) + log.info("Updating transaction $transactionId content with new query") + def builder = new TransactionBuilder(oldTransaction) queryBuilder.build(builder) def transaction = builder.build() @@ -61,12 +75,14 @@ class DBTManager implements Service { throw new RuntimeException("Transaction id changed") } - log.info("Sending transaction $transactionId to ledger") - httpClient.post(ledgerUrl.toURI(), { spec -> + log.info("Sending updated transaction $transaction.id to ledger at $config.ledger.remoteUrl") + httpClient.post(config.ledger.remoteUrl.toURI(), { spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }) - }.flatMap { response -> - Promise.value(mapper.readTree(response.body.text)) + }).onError { + log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl") + } + }.map { response -> + mapper.readTree(response.body.text) } } @@ -74,16 +90,13 @@ class DBTManager implements Service { private final List queries = [] - private final DBTManager manager - private final BlockTransaction transaction - private TransactionBuilder(DBTManager manager){ - this(manager, new BlockTransaction()) + private TransactionBuilder() { + this.transaction = new BlockTransaction() } - private TransactionBuilder(DBTManager manager, BlockTransaction transaction) { - this.manager = manager + private TransactionBuilder(BlockTransaction transaction) { this.transaction = transaction } diff --git a/src/ratpack/Ratpack.groovy b/src/ratpack/Ratpack.groovy index 6ca0464..3510eef 100644 --- a/src/ratpack/Ratpack.groovy +++ b/src/ratpack/Ratpack.groovy @@ -1,11 +1,13 @@ +import com.devsoap.dbt.DBTModule import com.devsoap.dbt.app.DatabaseService import com.devsoap.dbt.config.DBTConfig -import com.devsoap.dbt.DBTManager import com.devsoap.dbt.handlers.ExecutorHandler -import com.devsoap.dbt.handlers.LedgerHandler -import com.devsoap.dbt.services.LedgerService + +import com.devsoap.dbt.services.TransactionManagerService import com.fasterxml.jackson.databind.ObjectMapper import org.h2.jdbcx.JdbcDataSource +import ratpack.config.ConfigData +import ratpack.session.SessionModule import javax.sql.DataSource @@ -14,41 +16,29 @@ import static ratpack.groovy.Groovy.ratpack ratpack { serverConfig { - - /** - * DBT Framework config - */ - require("", DBTConfig) + sysProps() + require('/dbt', DBTConfig) } bindings { - bindInstance(new ObjectMapper()) + + module SessionModule + module (DBTModule) { + it.ledger.remoteUrl = 'http://localhost:5050/ledger' + it.executor.remoteUrl = 'http://localhost:5050/executor' + } + bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: '')) - bindInstance(new DatabaseService()) - - - /** - * DBT Framework manager - */ - bind(DBTManager) - bind(LedgerService) - bind(ExecutorHandler) - bind(LedgerHandler) + bind DatabaseService } handlers { - /** - * DBT Framework handlers - */ - path('executor', ExecutorHandler) - path('ledger', LedgerHandler) - /** * Consumer services */ get('frontend') { - get(DBTManager).execute { transaction -> + get(TransactionManagerService).execute { transaction -> transaction.query("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (${new Random().nextInt()}, 'HELLO')") }.then { redirect("/gateway/${it['id'].textValue()}") @@ -56,7 +46,7 @@ ratpack { } get('gateway/:transactionId?') { - get(DBTManager).execute(pathTokens.transactionId, { transaction -> + get(TransactionManagerService).execute(pathTokens.transactionId, { transaction -> transaction.query("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (${new Random().nextInt()}, 'WORLD')") }).then { redirect("/gateway2/${it['id'].textValue()}") @@ -64,7 +54,7 @@ ratpack { } get('gateway2/:transactionId?') { - get(DBTManager).execute(pathTokens.transactionId, { transaction -> + get(TransactionManagerService).execute(pathTokens.transactionId, { transaction -> transaction.query("SELECT * FROM LOGS") transaction.complete() }).then { diff --git a/src/test/groovy/com/devsoap/dbt/framework/CustomPortMainApplicationUnderTest.groovy b/src/test/groovy/com/devsoap/dbt/framework/CustomPortMainApplicationUnderTest.groovy new file mode 100644 index 0000000..e3e4009 --- /dev/null +++ b/src/test/groovy/com/devsoap/dbt/framework/CustomPortMainApplicationUnderTest.groovy @@ -0,0 +1,31 @@ +package com.devsoap.dbt.framework + +import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest +import ratpack.impose.Impositions +import ratpack.impose.ServerConfigImposition +import ratpack.server.RatpackServer + +class CustomPortMainApplicationUnderTest extends GroovyRatpackMainApplicationUnderTest { + + private final int port + + CustomPortMainApplicationUnderTest(int port) { + this.port = port + } + + @Override + protected Impositions createImpositions() throws Exception { + Impositions.of { + it.add(ServerConfigImposition.of { + it.port(port) + }) + } + } + + @Override + protected RatpackServer createServer() throws Exception { + System.setProperty('ratpack.dbt.ledger.remoteUrl', "http://localhost:$port/ledger") + System.setProperty('ratpack.dbt.executor.remoteUrl', "http://localhost:$port/executor") + return super.createServer() + } +} diff --git a/src/test/groovy/com/devsoap/dbt/framework/ExecutorSpec.groovy b/src/test/groovy/com/devsoap/dbt/framework/ExecutorSpec.groovy index 27accf4..5c6d807 100644 --- a/src/test/groovy/com/devsoap/dbt/framework/ExecutorSpec.groovy +++ b/src/test/groovy/com/devsoap/dbt/framework/ExecutorSpec.groovy @@ -1,7 +1,7 @@ package com.devsoap.dbt.framework +import com.devsoap.dbt.data.BlockTransaction import com.fasterxml.jackson.databind.ObjectMapper -import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest import spock.lang.AutoCleanup import spock.lang.Specification @@ -9,8 +9,26 @@ class ExecutorSpec extends Specification { def mapper = new ObjectMapper() + def PATH = 'executor' + @AutoCleanup - def aut = new GroovyRatpackMainApplicationUnderTest() + def aut = new CustomPortMainApplicationUnderTest(8888) + + void 'transaction sent to executor'() { + setup: + def transaction = new BlockTransaction() + transaction.execute("SELECT * FROM LOGS") + transaction.end() + when: + String json = aut.httpClient.requestSpec{ spec -> + spec.body.text(mapper.writeValueAsString(transaction)) + }.postText(PATH) + def recievedTransaction = mapper.readValue(json, BlockTransaction) + then: + recievedTransaction.id == transaction.id + recievedTransaction.completed + recievedTransaction.executed + } diff --git a/src/test/groovy/com/devsoap/dbt/framework/LedgerSpec.groovy b/src/test/groovy/com/devsoap/dbt/framework/LedgerSpec.groovy index e7f5f0e..4e7f095 100644 --- a/src/test/groovy/com/devsoap/dbt/framework/LedgerSpec.groovy +++ b/src/test/groovy/com/devsoap/dbt/framework/LedgerSpec.groovy @@ -1,32 +1,40 @@ package com.devsoap.dbt.framework -import com.devsoap.dbt.BlockTransaction -import com.devsoap.dbt.handlers.LedgerHandler +import com.devsoap.dbt.data.BlockTransaction + import com.fasterxml.jackson.databind.ObjectMapper import groovy.json.JsonSlurper import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest +import ratpack.impose.Imposition +import ratpack.impose.Impositions +import ratpack.impose.ImpositionsSpec +import ratpack.impose.ServerConfigImposition +import ratpack.server.RatpackServer import spock.lang.AutoCleanup +import spock.lang.Shared import spock.lang.Specification class LedgerSpec extends Specification { def mapper = new ObjectMapper() - def jsonSlurper = new JsonSlurper() + + def PATH = 'ledger' @AutoCleanup - def aut = new GroovyRatpackMainApplicationUnderTest() + GroovyRatpackMainApplicationUnderTest aut = new CustomPortMainApplicationUnderTest(8888) void 'transaction sent to ledger'() { setup: def transaction = new BlockTransaction() transaction.execute("SELECT * FROM LOGS") when: - def response = mapper.readValue(aut.httpClient.requestSpec { spec -> + String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }.post(LedgerHandler.PATH).body.text, BlockTransaction) + }.postText(PATH) + def recievedTransaction = mapper.readValue(json, BlockTransaction) then: - response.id == transaction.id - response.completed == false + recievedTransaction.id == transaction.id + !recievedTransaction.completed } void 'completed transaction marked as completed'() { @@ -35,12 +43,13 @@ class LedgerSpec extends Specification { transaction.execute("SELECT * FROM LOGS") transaction.end() when: - def response = mapper.readValue(aut.httpClient.requestSpec { spec -> + String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }.post(LedgerHandler.PATH).body.text, BlockTransaction) + }.postText(PATH) + def recievedTransaction = mapper.readValue(json, BlockTransaction) then: - response.id == transaction.id - response.completed == true + recievedTransaction.id == transaction.id + recievedTransaction.completed } void 'completed transaction sent to executor from ledger'() { @@ -51,7 +60,7 @@ class LedgerSpec extends Specification { when: def response = mapper.readValue(aut.httpClient.requestSpec { spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }.post(LedgerHandler.PATH).body.text, BlockTransaction) + }.post(PATH).body.text, BlockTransaction) then: response.id == transaction.id response.executed == true @@ -66,13 +75,16 @@ class LedgerSpec extends Specification { transaction.execute("SELECT * FROM LOGS") transaction.end() when: - def response = mapper.readValue(aut.httpClient.requestSpec { spec -> + String json = aut.httpClient.requestSpec{ spec -> spec.body.text(mapper.writeValueAsString(transaction)) - }.post(LedgerHandler.PATH).body.text, BlockTransaction) - def json = jsonSlurper.parseText(response.queries[1].result) + }.postText(PATH) + def recievedTransaction = mapper.readValue(json, BlockTransaction) + def result = recievedTransaction.queries[1].result then: - response.id == transaction.id - json.LOG_ID.first() == 1 - json.LOG_VALUE.first() == 'HELLO' + recievedTransaction.id == transaction.id + recievedTransaction.queries.first().result == null // insert query has no result + + result['LOG_ID'].first() == 1 + result['LOG_VALUE'].first() == 'HELLO' } } diff --git a/src/test/groovy/com/devsoap/dbt/framework/TransactionManagementServiceSpec.groovy b/src/test/groovy/com/devsoap/dbt/framework/TransactionManagementServiceSpec.groovy new file mode 100644 index 0000000..14ef24c --- /dev/null +++ b/src/test/groovy/com/devsoap/dbt/framework/TransactionManagementServiceSpec.groovy @@ -0,0 +1,33 @@ +package com.devsoap.dbt.framework + +import com.devsoap.dbt.data.BlockTransaction +import com.fasterxml.jackson.databind.ObjectMapper +import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest +import spock.lang.AutoCleanup +import spock.lang.Specification + +class TransactionManagementServiceSpec extends Specification { + + + def mapper = new ObjectMapper() + + def PATH = 'ledger' + + @AutoCleanup + GroovyRatpackMainApplicationUnderTest aut = new CustomPortMainApplicationUnderTest(8888) + + void 'transaction sent to ledger'() { + setup: + def transaction = new BlockTransaction() + transaction.execute("SELECT * FROM LOGS") + when: + String json = aut.httpClient.requestSpec{ spec -> + spec.body.text(mapper.writeValueAsString(transaction)) + }.postText(PATH) + def recievedTransaction = mapper.readValue(json, BlockTransaction) + then: + recievedTransaction.id == transaction.id + !recievedTransaction.completed + } + +}