Fixed docker networking
This commit is contained in:
@@ -20,6 +20,7 @@ import com.devsoap.dbt.actions.LedgerChainAction
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.data.LedgerData
|
||||
import com.devsoap.dbt.handlers.ExecutorHandler
|
||||
import com.devsoap.dbt.handlers.ConfigInfoHandler
|
||||
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
|
||||
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
|
||||
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
|
||||
@@ -27,7 +28,6 @@ import com.devsoap.dbt.services.LedgerService
|
||||
import com.devsoap.dbt.services.TransactionManagerService
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.google.inject.multibindings.Multibinder
|
||||
import groovy.json.JsonBuilder
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.guice.ConfigurableModule
|
||||
import ratpack.handling.HandlerDecorator
|
||||
@@ -47,6 +47,7 @@ class DBTModule extends ConfigurableModule<DBTConfig> {
|
||||
|
||||
bind(ExecutorChainAction)
|
||||
bind(ExecutorHandler)
|
||||
bind(ConfigInfoHandler)
|
||||
|
||||
bind(LedgerData)
|
||||
bind(LedgerService)
|
||||
|
||||
@@ -17,9 +17,11 @@ package com.devsoap.dbt.actions
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.handlers.ExecutorHandler
|
||||
import com.devsoap.dbt.handlers.ConfigInfoHandler
|
||||
import com.google.inject.Inject
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.groovy.handling.GroovyChainAction
|
||||
import ratpack.handling.Handlers
|
||||
|
||||
@Slf4j
|
||||
class ExecutorChainAction extends GroovyChainAction {
|
||||
@@ -39,5 +41,6 @@ class ExecutorChainAction extends GroovyChainAction {
|
||||
|
||||
log.info("Registering executor at /$executorPath")
|
||||
path(executorPath, ExecutorHandler)
|
||||
path("$executorPath/config", ConfigInfoHandler)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
package com.devsoap.dbt.actions
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.handlers.ConfigInfoHandler
|
||||
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
|
||||
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
|
||||
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
|
||||
@@ -46,5 +47,6 @@ class LedgerChainAction extends GroovyChainAction {
|
||||
registry.get(LedgerUpdateTransactionHandler),
|
||||
registry.get(LedgerListTransactionsHandler)
|
||||
))
|
||||
path("$ledgerPath/config", ConfigInfoHandler)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
import ratpack.http.HttpMethod
|
||||
import ratpack.jackson.Jackson
|
||||
|
||||
import javax.inject.Inject
|
||||
|
||||
class ConfigInfoHandler implements Handler {
|
||||
|
||||
private final DBTConfig config
|
||||
|
||||
@Inject
|
||||
ConfigInfoHandler(DBTConfig config) {
|
||||
this.config = config
|
||||
}
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) throws Exception {
|
||||
if(ctx.request.method == HttpMethod.GET) {
|
||||
ctx.render Jackson.json(config)
|
||||
} else {
|
||||
ctx.next()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -53,25 +53,35 @@ class ExecutorHandler implements Handler {
|
||||
def ds = ctx.get(DataSource)
|
||||
def transaction = mapper.readValue(body.text, BlockTransaction)
|
||||
|
||||
log.info('Recieved transaction {} for execution', transaction.id)
|
||||
|
||||
if(!validateChain(transaction)) {
|
||||
ctx.response.status(Status.of(400, 'Transaction chain invalid'))
|
||||
log.error("Transaction chain validation failed for transaction {}", transaction.id)
|
||||
ctx.response.status(Status.of(400, 'Transaction chain invalid')).send()
|
||||
return
|
||||
}
|
||||
|
||||
log.info('Executing transaction {} on {}', transaction.id, ds)
|
||||
executeCommands(ds, transaction).onError { e ->
|
||||
log.info("Sending rolled back transaction to ledger")
|
||||
println mapper.writeValueAsString(transaction)
|
||||
client.post(config.ledger.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}).then {
|
||||
ctx.error(e)
|
||||
}).onError { ee ->
|
||||
log.error('Failed to reach ledger', ee)
|
||||
ctx.response.status(Status.of(404, 'Ledger not found')).send()
|
||||
}.then {
|
||||
log.error('Transaction {} rolled back', transaction.id)
|
||||
ctx.response.status(Status.of(505, 'Transaction rollback')).send()
|
||||
}
|
||||
}.then {
|
||||
log.info("Updating ledger with execution result")
|
||||
client.post(config.ledger.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}).then {
|
||||
ctx.response.send(mapper.writeValueAsString(transaction))
|
||||
}).onError { ee ->
|
||||
log.error('Failed to reach ledger', ee)
|
||||
ctx.response.status(Status.of(404, 'Ledger not found')).send()
|
||||
}.then {
|
||||
ctx.response.send(it.body.text)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,8 @@ import ratpack.exec.Promise
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
import ratpack.http.HttpMethod
|
||||
import ratpack.http.Status
|
||||
import ratpack.http.client.HttpClient
|
||||
import ratpack.jackson.Jackson
|
||||
|
||||
import javax.inject.Inject
|
||||
@@ -32,10 +34,14 @@ import javax.inject.Inject
|
||||
class LedgerUpdateTransactionHandler implements Handler {
|
||||
|
||||
private final DBTConfig config
|
||||
private final HttpClient client
|
||||
private final ObjectMapper mapper
|
||||
|
||||
@Inject
|
||||
LedgerUpdateTransactionHandler(DBTConfig config) {
|
||||
LedgerUpdateTransactionHandler(DBTConfig config, HttpClient client, ObjectMapper mapper) {
|
||||
this.config = config
|
||||
this.mapper = mapper
|
||||
this.client = client
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -71,11 +77,18 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
def transaction = cloneTransaction(oldTransaction, newTransaction)
|
||||
|
||||
def ledgerService = ctx.get(LedgerService)
|
||||
ledgerService.updateTransaction(transaction).then {
|
||||
log.info("Transaction $it updated in ledger")
|
||||
ledgerService.updateTransaction(transaction).then { id ->
|
||||
log.info("Transaction $id updated in ledger")
|
||||
if (transaction.completed & !(transaction.executed || transaction.rolledback)) {
|
||||
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
|
||||
ctx.redirect(config.executor.remoteUrl)
|
||||
log.info("Sending transaction $id to executor at $config.executor.remoteUrl")
|
||||
client.post(config.executor.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}).onError { err ->
|
||||
log.error('Failed to reach executor', err)
|
||||
ctx.response.status(Status.of(404, 'Executor not found')).send()
|
||||
}.then {
|
||||
ctx.response.send(it.body.text)
|
||||
}
|
||||
} else {
|
||||
ctx.render(Jackson.json(transaction))
|
||||
}
|
||||
@@ -92,7 +105,14 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
log.info("Transaction $it added to ledger")
|
||||
if(transaction.completed){
|
||||
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
|
||||
ctx.redirect(config.executor.remoteUrl)
|
||||
client.post(config.executor.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}).onError { err ->
|
||||
log.error('Failed to reach executor', err)
|
||||
ctx.response.status(Status.of(404, 'Executor not found')).send()
|
||||
}.then {
|
||||
ctx.response.send(it.body.text)
|
||||
}
|
||||
} else {
|
||||
ctx.render(Jackson.json(transaction))
|
||||
}
|
||||
@@ -101,14 +121,14 @@ class LedgerUpdateTransactionHandler implements Handler {
|
||||
|
||||
private static BlockTransaction cloneTransaction(BlockTransaction oldTransaction, BlockTransaction newTransaction) {
|
||||
def transaction = new BlockTransaction()
|
||||
transaction.id = oldTransaction.id
|
||||
transaction.executed = oldTransaction.executed
|
||||
transaction.completed = oldTransaction.completed
|
||||
transaction.rolledback = oldTransaction.rolledback
|
||||
transaction.id = newTransaction.id
|
||||
transaction.executed = newTransaction.executed
|
||||
transaction.completed = newTransaction.completed
|
||||
transaction.rolledback = newTransaction.rolledback
|
||||
|
||||
newTransaction.queries.each { q ->
|
||||
def query = transaction.queries.isEmpty() ?
|
||||
new BlockTransaction.Query(oldTransaction, q.query) :
|
||||
new BlockTransaction.Query(transaction, q.query) :
|
||||
new BlockTransaction.Query(transaction.queries.last(), q.query)
|
||||
query.resultError = q.resultError
|
||||
query.result = q.result
|
||||
|
||||
@@ -7,13 +7,24 @@ services:
|
||||
container_name: dbt-executor
|
||||
environment:
|
||||
- RATPACK_DBT__LEDGER__REMOTE_URL=http://ledger:5050/ledger
|
||||
ports:
|
||||
- "5051:5050"
|
||||
- RATPACK_DEVELOPMENT=false
|
||||
networks:
|
||||
dbt:
|
||||
aliases:
|
||||
- executor
|
||||
|
||||
ledger:
|
||||
image: com.devsoap/dbt-ledger
|
||||
container_name: dbt-ledger
|
||||
environment:
|
||||
- RATPACK_DBT__EXECUTOR__REMOTE_URL=http://executor:5051/executor
|
||||
- RATPACK_DBT__EXECUTOR__REMOTE_URL=http://executor:5050/executor
|
||||
- RATPACK_DEVELOPMENT=false
|
||||
ports:
|
||||
- "5050:5050"
|
||||
- "5050:5050"
|
||||
networks:
|
||||
dbt:
|
||||
aliases:
|
||||
- ledger
|
||||
|
||||
networks:
|
||||
dbt:
|
||||
Reference in New Issue
Block a user