1

Fix tests

This commit is contained in:
2018-05-05 16:05:06 +03:00
parent 2ad773bc56
commit 880e159fe8
8 changed files with 91 additions and 86 deletions

View File

@@ -35,11 +35,11 @@ class BlockTransaction implements Serializable {
} }
/** /**
* End the current transaction * Commit the transaction to database
* *
* @return * @return
*/ */
void end() { void commit() {
completed = true completed = true
} }

View File

@@ -5,10 +5,12 @@ import com.devsoap.dbt.data.BlockTransaction
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import groovy.util.logging.Slf4j import groovy.util.logging.Slf4j
import ratpack.exec.Promise import ratpack.exec.Promise
import ratpack.func.Pair
import ratpack.handling.Context import ratpack.handling.Context
import ratpack.handling.Handler import ratpack.handling.Handler
import ratpack.http.Status import ratpack.http.Status
import ratpack.http.client.HttpClient import ratpack.http.client.HttpClient
import ratpack.http.client.ReceivedResponse
import ratpack.jdbctx.Transaction import ratpack.jdbctx.Transaction
import javax.inject.Inject import javax.inject.Inject
@@ -41,24 +43,24 @@ class ExecutorHandler implements Handler {
return return
} }
executeCommands(ds, transaction).then { executeCommands(ds, transaction).onError { e ->
transaction.executed = true log.info("Sending rolled back transaction to ledger")
println mapper.writeValueAsString(transaction)
// Notify ledger of result client.post(config.ledger.remoteUrl.toURI(), { spec ->
spec.body.text(mapper.writeValueAsString(transaction))
}).then {
ctx.error(e)
}
}.then {
log.info("Updating ledger with execution result") log.info("Updating ledger with execution result")
client.post(config.ledger.remoteUrl.toURI(), { spec -> client.post(config.ledger.remoteUrl.toURI(), { spec ->
spec.body.text(mapper.writeValueAsString(transaction)) spec.body.text(mapper.writeValueAsString(transaction))
}).then { }).then {
if(it.status != Status.OK) {
log.error("Failed to update ledger with execution result for transaction $transaction.id")
}
}
// Return transaction with result
ctx.response.send(mapper.writeValueAsString(transaction)) ctx.response.send(mapper.writeValueAsString(transaction))
} }
} }
} }
}
private static boolean validateChain(BlockTransaction transaction) { private static boolean validateChain(BlockTransaction transaction) {
if(transaction.queries[0].parent != transaction.id) { if(transaction.queries[0].parent != transaction.id) {
@@ -81,17 +83,16 @@ class ExecutorHandler implements Handler {
def txDs = Transaction.dataSource(ds) def txDs = Transaction.dataSource(ds)
def tx = Transaction.create { ds.connection } def tx = Transaction.create { ds.connection }
tx.wrap { tx.wrap {
Promise.sync {
try { try {
transaction.queries.each { block -> transaction.queries.each { block ->
try{ try{
log.info "Executing $block.query ..."
if(block.query.toLowerCase().startsWith("select")){
log.info('Saving result from Select query')
def result = txDs.connection def result = txDs.connection
.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE) .createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE)
.executeQuery(block.query) .executeQuery(block.query)
block.result = toMap(result) block.result = toMap(result)
log.info "Executing $block.query ..."
if(block.query.toLowerCase().startsWith("select")){
log.info('Saving result from Select query')
} else { } else {
txDs.connection.createStatement().execute(block.query) txDs.connection.createStatement().execute(block.query)
} }
@@ -100,12 +101,13 @@ class ExecutorHandler implements Handler {
throw e throw e
} }
} }
transaction.executed = true
Promise.sync { transaction }
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to execute transaction $transaction.id, transaction rolled back", e) log.error("Failed to execute transaction $transaction.id, transaction rolled back", e)
tx.rollback() tx.rollback()
transaction.rolledback = true transaction.rolledback = true
} Promise.error(e)
transaction
} }
} }
} }

View File

@@ -5,6 +5,7 @@ import com.devsoap.dbt.data.BlockTransaction
import com.devsoap.dbt.services.LedgerService import com.devsoap.dbt.services.LedgerService
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import groovy.util.logging.Slf4j import groovy.util.logging.Slf4j
import ratpack.exec.Promise
import ratpack.handling.Context import ratpack.handling.Context
import ratpack.handling.Handler import ratpack.handling.Handler
import ratpack.http.HttpMethod import ratpack.http.HttpMethod
@@ -24,45 +25,55 @@ class LedgerUpdateTransactionHandler implements Handler {
@Override @Override
void handle(Context ctx) throws Exception { void handle(Context ctx) throws Exception {
ctx.with { if (ctx.request.method != HttpMethod.POST) {
if(ctx.request.method == HttpMethod.POST) { ctx.next()
if(!config.executor.remoteUrl) { return
}
if (!config.executor.remoteUrl) {
throw new RuntimeException("Executor URL is not set, cannot update transaction") throw new RuntimeException("Executor URL is not set, cannot update transaction")
} }
def ledgerService = get(LedgerService) def ledgerService = ctx.get(LedgerService)
request.body.then { body -> ctx.request.body.then { body ->
def mapper = get(ObjectMapper) def mapper = ctx.get(ObjectMapper)
def transaction = mapper.readValue(body.text, BlockTransaction) BlockTransaction transaction = mapper.readValue(body.text, BlockTransaction)
log.info("Recieved transaction $transaction.id") log.info("Recieved transaction $transaction.id")
ledgerService.fetchTransaction(transaction.id).then { ledgerService.fetchTransaction(transaction.id).then { Optional<BlockTransaction> t ->
if(it.present) { t.present ? updateTransaction(ctx, transaction) : newTransaction(ctx, transaction)
}
}
}
private void updateTransaction(Context ctx, BlockTransaction transaction) {
def ledgerService = ctx.get(LedgerService)
log.info "Transaction $transaction.id exists, updating transaction" log.info "Transaction $transaction.id exists, updating transaction"
ledgerService.updateTransaction(transaction).then { ledgerService.updateTransaction(transaction).then {
log.info("Transaction $it updated in ledger") log.info("Transaction $it updated in ledger")
if(transaction.completed && !transaction.executed){ if (transaction.completed & !(transaction.executed || transaction.rolledback)) {
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
redirect(config.executor.remoteUrl) ctx.redirect(config.executor.remoteUrl)
} else { } else {
render(Jackson.json(transaction)) ctx.render(Jackson.json(transaction))
} }
} }
} else { }
private void newTransaction(Context ctx, BlockTransaction transaction) {
if(transaction.executed || transaction.rolledback) {
log.error("Tried to create a already executed transaction $transaction.id")
throw new IllegalArgumentException("Cannot create a transaction with executed or rolledback status")
}
def ledgerService = ctx.get(LedgerService)
log.info("Creating new transaction") log.info("Creating new transaction")
ledgerService.newTransaction(transaction).then { ledgerService.newTransaction(transaction).then {
log.info("Transaction $it added to ledger") log.info("Transaction $it added to ledger")
if(transaction.completed && !transaction.executed){ if(transaction.completed){
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl") log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
redirect(config.executor.remoteUrl) ctx.redirect(config.executor.remoteUrl)
} else { } else {
render(Jackson.json(transaction)) ctx.render(Jackson.json(transaction))
}
}
}
}
}
} else {
next()
} }
} }
} }

View File

@@ -128,7 +128,7 @@ class TransactionManagerService implements Service {
} }
TransactionBuilder complete() { TransactionBuilder complete() {
transaction.end() transaction.commit()
this this
} }

View File

@@ -1,13 +1,14 @@
apply plugin: 'org.flywaydb.flyway' apply plugin: 'org.flywaydb.flyway'
dependencies { dependencies {
testCompile project(':dbt-core') compile project(':dbt-core')
testCompile ratpack.dependency('h2') runtime ratpack.dependency('handlebars')
testCompile ratpack.dependency('jdbc-tx') runtime ratpack.dependency('h2')
runtime ratpack.dependency('jdbc-tx')
testCompile 'org.flywaydb:flyway-core:5.0.7' compile 'org.flywaydb:flyway-core:5.0.7'
testRuntime 'org.slf4j:slf4j-simple:1.7.25' runtime 'org.slf4j:slf4j-simple:1.7.25'
testCompile ratpack.dependency('test') testCompile ratpack.dependency('test')
testCompile 'org.spockframework:spock-core:1.0-groovy-2.4' testCompile 'org.spockframework:spock-core:1.0-groovy-2.4'

View File

@@ -18,8 +18,6 @@ ratpack {
bindings { bindings {
module SessionModule
module (DBTModule) { module (DBTModule) {
it.ledger.remoteUrl = 'http://localhost:8888/ledger' it.ledger.remoteUrl = 'http://localhost:8888/ledger'
it.executor.remoteUrl = 'http://localhost:8888/executor' it.executor.remoteUrl = 'http://localhost:8888/executor'

View File

@@ -18,7 +18,7 @@ class ExecutorSpec extends Specification {
setup: setup:
def transaction = new BlockTransaction() def transaction = new BlockTransaction()
transaction.execute("SELECT * FROM LOGS") transaction.execute("SELECT * FROM LOGS")
transaction.end() transaction.commit()
when: when:
String json = aut.httpClient.requestSpec{ spec -> String json = aut.httpClient.requestSpec{ spec ->
spec.body.text(mapper.writeValueAsString(transaction)) spec.body.text(mapper.writeValueAsString(transaction))

View File

@@ -3,15 +3,8 @@ package com.devsoap.dbt
import com.devsoap.dbt.data.BlockTransaction import com.devsoap.dbt.data.BlockTransaction
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper
import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest
import ratpack.impose.Imposition
import ratpack.impose.Impositions
import ratpack.impose.ImpositionsSpec
import ratpack.impose.ServerConfigImposition
import ratpack.server.RatpackServer
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification import spock.lang.Specification
class LedgerSpec extends Specification { class LedgerSpec extends Specification {
@@ -41,7 +34,7 @@ class LedgerSpec extends Specification {
setup: setup:
def transaction = new BlockTransaction() def transaction = new BlockTransaction()
transaction.execute("SELECT * FROM LOGS") transaction.execute("SELECT * FROM LOGS")
transaction.end() transaction.commit()
when: when:
String json = aut.httpClient.requestSpec{ spec -> String json = aut.httpClient.requestSpec{ spec ->
spec.body.text(mapper.writeValueAsString(transaction)) spec.body.text(mapper.writeValueAsString(transaction))
@@ -56,7 +49,7 @@ class LedgerSpec extends Specification {
setup: setup:
def transaction = new BlockTransaction() def transaction = new BlockTransaction()
transaction.execute("SELECT * FROM LOGS") transaction.execute("SELECT * FROM LOGS")
transaction.end() transaction.commit()
when: when:
def response = mapper.readValue(aut.httpClient.requestSpec { spec -> def response = mapper.readValue(aut.httpClient.requestSpec { spec ->
spec.body.text(mapper.writeValueAsString(transaction)) spec.body.text(mapper.writeValueAsString(transaction))
@@ -73,7 +66,7 @@ class LedgerSpec extends Specification {
def transaction = new BlockTransaction() def transaction = new BlockTransaction()
transaction.execute("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (1, 'HELLO')") transaction.execute("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (1, 'HELLO')")
transaction.execute("SELECT * FROM LOGS") transaction.execute("SELECT * FROM LOGS")
transaction.end() transaction.commit()
when: when:
String json = aut.httpClient.requestSpec{ spec -> String json = aut.httpClient.requestSpec{ spec ->
spec.body.text(mapper.writeValueAsString(transaction)) spec.body.text(mapper.writeValueAsString(transaction))