Moarr work!
This commit is contained in:
@@ -21,6 +21,7 @@ repositories {
|
||||
dependencies {
|
||||
compile ratpack.dependency('h2')
|
||||
compile ratpack.dependency('jdbc-tx')
|
||||
compile ratpack.dependency('session')
|
||||
compile 'org.flywaydb:flyway-core:5.0.7'
|
||||
runtime 'org.slf4j:slf4j-simple:1.7.25'
|
||||
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
package com.devsoap.dbt
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.security.MessageDigest
|
||||
|
||||
class BlockTransaction implements Serializable {
|
||||
|
||||
String id = UUID.randomUUID().toString()
|
||||
|
||||
List<Query> queries = []
|
||||
|
||||
boolean completed = false
|
||||
|
||||
boolean executed = false
|
||||
|
||||
boolean rolledback = false
|
||||
|
||||
/**
|
||||
* Executes a query in the transaction
|
||||
*
|
||||
* @param query
|
||||
* the query to execute
|
||||
* @return
|
||||
* the result of the query
|
||||
*/
|
||||
void execute(String query) {
|
||||
queries << new Query(queries.empty? null : queries.last(), query)
|
||||
}
|
||||
|
||||
/**
|
||||
* End the current transaction
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
void end() {
|
||||
completed = true
|
||||
}
|
||||
|
||||
// A block in the chain
|
||||
static final class Query implements Serializable {
|
||||
String data
|
||||
String hash
|
||||
long timeStamp
|
||||
Object result
|
||||
|
||||
Query() {
|
||||
// For serialization
|
||||
}
|
||||
|
||||
Query(Query previous, String data) {
|
||||
this.data = data
|
||||
timeStamp = new Date().getTime()
|
||||
hash = generateHash(previous?.hash)
|
||||
}
|
||||
|
||||
private final String generateHash(String previousHash) {
|
||||
def digest = MessageDigest.getInstance('SHA-256')
|
||||
def hash = digest.digest("${previousHash?:''}$timeStamp$data".getBytes(StandardCharsets.UTF_8))
|
||||
hash.encodeHex().toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
62
src/main/groovy/com/devsoap/dbt/DBTModule.groovy
Normal file
62
src/main/groovy/com/devsoap/dbt/DBTModule.groovy
Normal file
@@ -0,0 +1,62 @@
|
||||
package com.devsoap.dbt
|
||||
|
||||
import com.devsoap.dbt.actions.ExecutorChainAction
|
||||
import com.devsoap.dbt.actions.LedgerChainAction
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.data.LedgerData
|
||||
import com.devsoap.dbt.handlers.ExecutorHandler
|
||||
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
|
||||
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
|
||||
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
import com.devsoap.dbt.services.TransactionManagerService
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.google.inject.multibindings.Multibinder
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.guice.ConfigurableModule
|
||||
import ratpack.handling.HandlerDecorator
|
||||
import ratpack.server.ServerConfig
|
||||
import ratpack.session.Session
|
||||
|
||||
@Slf4j
|
||||
class DBTModule extends ConfigurableModule<DBTConfig> {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ObjectMapper)
|
||||
|
||||
bind(LedgerChainAction)
|
||||
bind(LedgerGetTransactionHandler)
|
||||
bind(LedgerListTransactionsHandler)
|
||||
bind(LedgerUpdateTransactionHandler)
|
||||
|
||||
bind(ExecutorChainAction)
|
||||
bind(ExecutorHandler)
|
||||
|
||||
bind(LedgerData)
|
||||
|
||||
bind(LedgerService)
|
||||
bind(TransactionManagerService)
|
||||
|
||||
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
|
||||
.toInstance(HandlerDecorator.prependHandlers(LedgerChainAction))
|
||||
|
||||
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
|
||||
.toInstance(HandlerDecorator.prependHandlers(ExecutorChainAction))
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DBTConfig createConfig(ServerConfig serverConfig) {
|
||||
(DBTConfig) serverConfig.getAsConfigObject('/dbt', DBTConfig)?.getObject() ?: super.createConfig(serverConfig)
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) {
|
||||
if(!config.executor.remoteUrl) {
|
||||
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.executor.path}"
|
||||
}
|
||||
if(!config.ledger.remoteUrl) {
|
||||
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.ledger.path}"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.devsoap.dbt.actions
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.handlers.ExecutorHandler
|
||||
import com.google.inject.Inject
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.groovy.handling.GroovyChainAction
|
||||
|
||||
@Slf4j
|
||||
class ExecutorChainAction extends GroovyChainAction {
|
||||
|
||||
private final String executorPath
|
||||
|
||||
@Inject
|
||||
ExecutorChainAction(DBTConfig config) {
|
||||
executorPath = config.executor.path
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute() throws Exception {
|
||||
log.info("Registering executor at $executorPath")
|
||||
path(executorPath, ExecutorHandler)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.devsoap.dbt.actions
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
|
||||
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
|
||||
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
|
||||
import com.google.inject.Inject
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.groovy.handling.GroovyChainAction
|
||||
import ratpack.handling.Handlers
|
||||
|
||||
@Slf4j
|
||||
class LedgerChainAction extends GroovyChainAction {
|
||||
|
||||
private final String ledgerPath
|
||||
|
||||
@Inject
|
||||
LedgerChainAction(DBTConfig config) {
|
||||
ledgerPath = config.ledger.path
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute() throws Exception {
|
||||
log.info("Registering ledger at $ledgerPath")
|
||||
path(ledgerPath, Handlers.chain(
|
||||
registry.get(LedgerGetTransactionHandler),
|
||||
registry.get(LedgerUpdateTransactionHandler),
|
||||
registry.get(LedgerListTransactionsHandler)
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -2,4 +2,5 @@ package com.devsoap.dbt.config
|
||||
|
||||
class DBTConfig {
|
||||
LedgerConfig ledger = new LedgerConfig()
|
||||
ExecutorConfig executor = new ExecutorConfig()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.devsoap.dbt.config
|
||||
|
||||
class ExecutorConfig {
|
||||
String path = 'executor'
|
||||
String remoteUrl
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.devsoap.dbt.config
|
||||
|
||||
class LedgerConfig {
|
||||
String url = 'http://localhost:5050/ledger'
|
||||
String path = 'ledger'
|
||||
String remoteUrl
|
||||
}
|
||||
|
||||
82
src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy
Normal file
82
src/main/groovy/com/devsoap/dbt/data/BlockTransaction.groovy
Normal file
@@ -0,0 +1,82 @@
|
||||
package com.devsoap.dbt.data
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonValue
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import groovy.transform.ToString
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.security.MessageDigest
|
||||
|
||||
@ToString
|
||||
class BlockTransaction implements Serializable {
|
||||
|
||||
String id = UUID.randomUUID().toString().replace('-','')
|
||||
|
||||
List<Query> queries = []
|
||||
|
||||
boolean completed = false
|
||||
|
||||
boolean executed = false
|
||||
|
||||
boolean rolledback = false
|
||||
|
||||
/**
|
||||
* Executes a query in the transaction
|
||||
*
|
||||
* @param query
|
||||
* the query to execute
|
||||
* @return
|
||||
* the result of the query
|
||||
*/
|
||||
void execute(String query) {
|
||||
if(queries.isEmpty()){
|
||||
queries << new Query(this, query)
|
||||
} else {
|
||||
queries << new Query(queries.last(), query)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the current transaction
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
void end() {
|
||||
completed = true
|
||||
}
|
||||
|
||||
// A block in the chain
|
||||
@ToString
|
||||
static final class Query implements Serializable {
|
||||
String query
|
||||
String id
|
||||
String parent
|
||||
long timeStamp
|
||||
|
||||
Map<String, List> result
|
||||
|
||||
Query() {
|
||||
// For serialization
|
||||
}
|
||||
|
||||
Query(Query previous, String query) {
|
||||
this.query = query
|
||||
timeStamp = new Date().getTime()
|
||||
parent = previous.id
|
||||
id = generateHash()
|
||||
}
|
||||
|
||||
Query(BlockTransaction transaction, String query) {
|
||||
this.query = query
|
||||
timeStamp = new Date().getTime()
|
||||
parent = transaction.id
|
||||
id = generateHash()
|
||||
}
|
||||
|
||||
final String generateHash() {
|
||||
def digest = MessageDigest.getInstance('SHA-256')
|
||||
def hash = digest.digest("${parent?:''}$timeStamp$query".getBytes(StandardCharsets.UTF_8))
|
||||
hash.encodeHex().toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
6
src/main/groovy/com/devsoap/dbt/data/LedgerData.groovy
Normal file
6
src/main/groovy/com/devsoap/dbt/data/LedgerData.groovy
Normal file
@@ -0,0 +1,6 @@
|
||||
package com.devsoap.dbt.data
|
||||
|
||||
class LedgerData implements Serializable {
|
||||
|
||||
List<BlockTransaction> transactions = new ArrayList<>()
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.BlockTransaction
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode
|
||||
import groovy.util.logging.Log
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
@@ -14,11 +14,9 @@ import ratpack.jdbctx.Transaction
|
||||
import javax.sql.DataSource
|
||||
import java.sql.ResultSet
|
||||
|
||||
@Log
|
||||
@Slf4j
|
||||
class ExecutorHandler implements Handler {
|
||||
|
||||
static final String PATH = 'executor'
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) throws Exception {
|
||||
ctx.request.body.then { body ->
|
||||
@@ -27,7 +25,7 @@ class ExecutorHandler implements Handler {
|
||||
def transaction = mapper.readValue(body.text, BlockTransaction)
|
||||
|
||||
if(!validateChain(transaction)) {
|
||||
ctx.response.status = Status.of(400, 'Transaction chain invalid')
|
||||
ctx.response.status(Status.of(400, 'Transaction chain invalid'))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -38,25 +36,38 @@ class ExecutorHandler implements Handler {
|
||||
}
|
||||
}
|
||||
|
||||
boolean validateChain(BlockTransaction transaction) {
|
||||
//FIXME
|
||||
private static boolean validateChain(BlockTransaction transaction) {
|
||||
if(transaction.queries[0].parent != transaction.id) {
|
||||
return false
|
||||
}
|
||||
for(int i=1; i<transaction.queries.size(); i++) {
|
||||
def query = transaction.queries[i]
|
||||
def prev = transaction.queries[i-1]
|
||||
if(query.id != query.generateHash()) {
|
||||
return false
|
||||
}
|
||||
if(query.parent != prev.generateHash()) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
Promise<BlockTransaction> executeCommands(DataSource ds, ObjectMapper mapper, BlockTransaction transaction) {
|
||||
private static Promise<BlockTransaction> executeCommands(DataSource ds, ObjectMapper mapper, BlockTransaction transaction) {
|
||||
def txDs = Transaction.dataSource(ds)
|
||||
def tx = Transaction.create { ds.connection }
|
||||
tx.wrap {
|
||||
Promise.sync {
|
||||
transaction.queries.each { block ->
|
||||
log.info "Executing $block.data ..."
|
||||
if(block.data.toLowerCase().startsWith("select")){
|
||||
log.info "Executing $block.query ..."
|
||||
if(block.query.toLowerCase().startsWith("select")){
|
||||
log.info('Saving result from Select query')
|
||||
def result = txDs.connection
|
||||
.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE)
|
||||
.executeQuery(block.data)
|
||||
block.result = toJson(mapper, result)
|
||||
.executeQuery(block.query)
|
||||
block.result = toMap(result)
|
||||
} else {
|
||||
txDs.connection.createStatement().execute(block.data)
|
||||
txDs.connection.createStatement().execute(block.query)
|
||||
}
|
||||
}
|
||||
transaction
|
||||
@@ -64,30 +75,26 @@ class ExecutorHandler implements Handler {
|
||||
}
|
||||
}
|
||||
|
||||
private static JsonNode toJson(ObjectMapper mapper, ResultSet resultSet) {
|
||||
def json = mapper.createObjectNode()
|
||||
private static Map toMap(ResultSet resultSet) {
|
||||
def map = [:]
|
||||
|
||||
if(resultSet.last()) {
|
||||
int rows = resultSet.row
|
||||
log.info("Converting $rows rows to json")
|
||||
resultSet.beforeFirst()
|
||||
|
||||
resultSet.metaData.columnCount.times { column ->
|
||||
def columnIndex = column + 1
|
||||
def columnName = resultSet.metaData.getColumnName(columnIndex)
|
||||
ArrayNode columnValue = json.get(columnName)
|
||||
if(!columnValue) {
|
||||
columnValue = mapper.createArrayNode()
|
||||
json.set(columnName, columnValue)
|
||||
|
||||
def columnValues = map[columnName] as List
|
||||
if(columnValues == null) {
|
||||
map[columnName] = columnValues = []
|
||||
}
|
||||
|
||||
resultSet.beforeFirst()
|
||||
while(resultSet.next()) {
|
||||
columnValue.addPOJO(resultSet.getObject(columnIndex))
|
||||
columnValues << resultSet.getObject(columnIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
//mapper.writeValueAsString(json)
|
||||
mapper.valueToTree(json)
|
||||
map
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
import groovy.util.logging.Log
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
import ratpack.http.HttpMethod
|
||||
import ratpack.jackson.Jackson
|
||||
import ratpack.session.Session
|
||||
|
||||
@Slf4j
|
||||
class LedgerGetTransactionHandler implements Handler {
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) throws Exception {
|
||||
ctx.with {
|
||||
if(request.method == HttpMethod.GET && header('X-Transaction-Id').present) {
|
||||
def id = request.headers['X-Transaction-Id'].toString()
|
||||
def ledgerService = ctx.get(LedgerService)
|
||||
def session = ctx.get(Session)
|
||||
ledgerService.fetchTransaction(session, id).then {
|
||||
if(it.present) {
|
||||
render(Jackson.json(it.get()))
|
||||
} else {
|
||||
notFound()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.BlockTransaction
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import groovy.util.logging.Log
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
|
||||
@Log
|
||||
class LedgerHandler implements Handler {
|
||||
|
||||
static final String PATH = 'ledger'
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) {
|
||||
def ledgerService = ctx.get(LedgerService)
|
||||
|
||||
ctx.byMethod {
|
||||
delegate = it
|
||||
|
||||
get({
|
||||
def transaction = ledgerService.fetchTransaction(ctx.request.headers['X-Transaction-Id'].toString())
|
||||
def mapper = ctx.get(ObjectMapper)
|
||||
ctx.response.send(mapper.writeValueAsString(transaction))
|
||||
} as Handler)
|
||||
|
||||
post({
|
||||
ctx.request.body.then { body ->
|
||||
def mapper = ctx.get(ObjectMapper)
|
||||
def transaction = mapper.readValue(body.text, BlockTransaction)
|
||||
|
||||
def existingTransaction = ledgerService.fetchTransaction(transaction.id)
|
||||
if(existingTransaction) {
|
||||
ledgerService.updateTransaction(transaction)
|
||||
} else {
|
||||
log.info("Creating new transaction")
|
||||
ledgerService.newTransaction(transaction)
|
||||
}
|
||||
if(transaction.completed){
|
||||
log.info("Sending transaction $transaction.id to executor")
|
||||
ctx.redirect(ExecutorHandler.PATH)
|
||||
} else {
|
||||
ctx.response.send(mapper.writeValueAsString(transaction))
|
||||
}
|
||||
}
|
||||
} as Handler)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
import ratpack.http.HttpMethod
|
||||
import ratpack.jackson.Jackson
|
||||
import ratpack.session.Session
|
||||
|
||||
@Slf4j
|
||||
class LedgerListTransactionsHandler implements Handler {
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) throws Exception {
|
||||
ctx.with {
|
||||
if(request.method == HttpMethod.GET && !header('X-Transaction-Id').present) {
|
||||
def session = get(Session)
|
||||
log.info("Listing transactions in session $session.id")
|
||||
def ledgerService = get(LedgerService)
|
||||
ledgerService.allTransactions(session).then {
|
||||
render(Jackson.json(it))
|
||||
}
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package com.devsoap.dbt.handlers
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.devsoap.dbt.data.LedgerData
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import groovy.util.logging.Log
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.config.ConfigData
|
||||
import ratpack.handling.Context
|
||||
import ratpack.handling.Handler
|
||||
import ratpack.http.HttpMethod
|
||||
import ratpack.jackson.Jackson
|
||||
import ratpack.server.ServerConfig
|
||||
import ratpack.session.Session
|
||||
|
||||
import javax.inject.Inject
|
||||
|
||||
@Slf4j
|
||||
class LedgerUpdateTransactionHandler implements Handler {
|
||||
|
||||
private final String executorUrl
|
||||
|
||||
@Inject
|
||||
LedgerUpdateTransactionHandler(DBTConfig config) {
|
||||
executorUrl = config.executor.remoteUrl
|
||||
}
|
||||
|
||||
@Override
|
||||
void handle(Context ctx) throws Exception {
|
||||
ctx.with {
|
||||
if(ctx.request.method == HttpMethod.POST) {
|
||||
if(!executorUrl) {
|
||||
throw new RuntimeException("Executor URL is not set, cannot update transaction")
|
||||
}
|
||||
|
||||
def ledgerService = get(LedgerService)
|
||||
def session = get(Session)
|
||||
request.body.then { body ->
|
||||
def mapper = get(ObjectMapper)
|
||||
def transaction = mapper.readValue(body.text, BlockTransaction)
|
||||
log.info("Recieved transaction $transaction.id")
|
||||
ledgerService.fetchTransaction(session,transaction.id).then {
|
||||
if(it.present) {
|
||||
log.info "Transaction $transaction.id exists, updating transaction"
|
||||
ledgerService.updateTransaction(session, transaction).then {
|
||||
log.info("Transaction $it updated in ledger")
|
||||
if(transaction.completed){
|
||||
log.info("Sending transaction $transaction.id to executor at $executorUrl")
|
||||
redirect(executorUrl)
|
||||
} else {
|
||||
render(Jackson.json(transaction))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.info("Creating new transaction")
|
||||
ledgerService.newTransaction(session, transaction).then {
|
||||
log.info("Transaction $it added to ledger")
|
||||
if(transaction.completed){
|
||||
log.info("Sending transaction $transaction.id to executor at $executorUrl")
|
||||
redirect(executorUrl)
|
||||
} else {
|
||||
render(Jackson.json(transaction))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,33 +1,35 @@
|
||||
package com.devsoap.dbt.services
|
||||
|
||||
import com.devsoap.dbt.BlockTransaction
|
||||
import groovy.util.logging.Log
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.devsoap.dbt.data.LedgerData
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.service.Service
|
||||
import ratpack.service.StartEvent
|
||||
import ratpack.service.StopEvent
|
||||
import ratpack.session.Session
|
||||
|
||||
@Log
|
||||
@Slf4j
|
||||
class LedgerService implements Service {
|
||||
|
||||
static final transient List<BlockTransaction> transactions = []
|
||||
private static final LedgerData data = new LedgerData()
|
||||
|
||||
BlockTransaction fetchTransaction(String transactionId) {
|
||||
log.info("Fetching transaction $transactionId")
|
||||
log.info("Transactions:$transactions")
|
||||
transactions.find {it.id == transactionId}
|
||||
Promise<Optional<BlockTransaction>> fetchTransaction(Session session, String transactionId) {
|
||||
Promise.value(Optional.ofNullable(data.transactions.find {it.id == transactionId}))
|
||||
}
|
||||
|
||||
String newTransaction(BlockTransaction transaction) {
|
||||
log.info("Adding new transaction $transaction.id")
|
||||
transactions << transaction
|
||||
transaction.id
|
||||
Promise<List<BlockTransaction>> allTransactions(Session session) {
|
||||
Promise.value(data.transactions)
|
||||
}
|
||||
|
||||
String updateTransaction(BlockTransaction transaction) {
|
||||
log.info("Updating transaction $transaction.id")
|
||||
def existingTransaction = fetchTransaction(transaction.id)
|
||||
def index = transactions.indexOf(existingTransaction)
|
||||
transactions.remove(index)
|
||||
transactions.add(index, transaction)
|
||||
Promise<String> newTransaction(Session session, BlockTransaction transaction) {
|
||||
log.info("Adding new transaction $transaction.id to session ${session.id}")
|
||||
data.transactions.add(transaction)
|
||||
Promise.value(transaction.id)
|
||||
}
|
||||
|
||||
Promise<String> updateTransaction(Session session, BlockTransaction transaction) {
|
||||
log.info("Updating transaction $transaction.id in session ${session.id}")
|
||||
data.transactions.removeAll {it.id == transaction.id}
|
||||
data.transactions.add(transaction)
|
||||
Promise.value(transaction.id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,50 +1,63 @@
|
||||
package com.devsoap.dbt
|
||||
package com.devsoap.dbt.services
|
||||
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import groovy.util.logging.Log
|
||||
import groovy.util.logging.Slf4j
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.http.Status
|
||||
import ratpack.http.client.HttpClient
|
||||
import ratpack.service.Service
|
||||
|
||||
import javax.inject.Inject
|
||||
|
||||
@Log
|
||||
class DBTManager implements Service {
|
||||
|
||||
private final String ledgerUrl
|
||||
@Slf4j
|
||||
class TransactionManagerService implements Service {
|
||||
private final HttpClient httpClient
|
||||
private final ObjectMapper mapper
|
||||
private final DBTConfig config
|
||||
|
||||
@Inject
|
||||
DBTManager(DBTConfig config, HttpClient httpClient, ObjectMapper mapper){
|
||||
ledgerUrl = config.ledger.url
|
||||
TransactionManagerService(DBTConfig config, HttpClient httpClient, ObjectMapper mapper){
|
||||
this.config = config
|
||||
this.httpClient = httpClient
|
||||
this.mapper = mapper
|
||||
}
|
||||
|
||||
Promise<JsonNode> execute(ExecuteQuery queryBuilder) {
|
||||
log.info("Executing new transaction")
|
||||
def builder = new TransactionBuilder(this)
|
||||
if(!config.ledger.remoteUrl) {
|
||||
throw new RuntimeException("Ledger remote url is not set, cannot execute query")
|
||||
}
|
||||
|
||||
def builder = new TransactionBuilder()
|
||||
queryBuilder.build(builder)
|
||||
def transaction = builder.build()
|
||||
|
||||
log.info("Sending transaction $transaction.id to ledger")
|
||||
httpClient.post(ledgerUrl.toURI(), { spec ->
|
||||
log.info("Sending transaction $transaction.id to ledger at $config.ledger.remoteUrl")
|
||||
httpClient.post(config.ledger.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}).flatMap { response ->
|
||||
Promise.value(mapper.readTree(response.body.text))
|
||||
}).onError {
|
||||
log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl")
|
||||
}.map { response ->
|
||||
if(response.status == Status.OK) {
|
||||
mapper.readTree(response.body.text)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Promise<JsonNode> execute(String transactionId, ExecuteQuery queryBuilder) {
|
||||
log.info("Amending existing transaction $transactionId")
|
||||
if(!config.ledger.remoteUrl) {
|
||||
throw new RuntimeException("Ledger remote url is not set, cannot execute query")
|
||||
}
|
||||
|
||||
log.info("Getting transaction $transactionId from ledger")
|
||||
httpClient.get(ledgerUrl.toURI(), { spec ->
|
||||
log.info("Sending transaction $transactionId to ledger at $config.ledger.remoteUrl")
|
||||
httpClient.get(config.ledger.remoteUrl.toURI(), { spec ->
|
||||
spec.headers.add('X-Transaction-Id', transactionId)
|
||||
}).flatMap { response ->
|
||||
if(response.status != Status.OK) {
|
||||
throw new RuntimeException("Server returned ${response.statusCode} ${response.status.message}")
|
||||
}
|
||||
def oldTransaction = mapper.readValue(response.body.text, BlockTransaction)
|
||||
if(oldTransaction == null) {
|
||||
throw new RuntimeException("Transaction with id $transactionId could not be found")
|
||||
@@ -53,7 +66,8 @@ class DBTManager implements Service {
|
||||
throw new RuntimeException("Cannot modify a completed transaction")
|
||||
}
|
||||
|
||||
def builder = new TransactionBuilder(this, oldTransaction)
|
||||
log.info("Updating transaction $transactionId content with new query")
|
||||
def builder = new TransactionBuilder(oldTransaction)
|
||||
queryBuilder.build(builder)
|
||||
def transaction = builder.build()
|
||||
|
||||
@@ -61,12 +75,14 @@ class DBTManager implements Service {
|
||||
throw new RuntimeException("Transaction id changed")
|
||||
}
|
||||
|
||||
log.info("Sending transaction $transactionId to ledger")
|
||||
httpClient.post(ledgerUrl.toURI(), { spec ->
|
||||
log.info("Sending updated transaction $transaction.id to ledger at $config.ledger.remoteUrl")
|
||||
httpClient.post(config.ledger.remoteUrl.toURI(), { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
})
|
||||
}.flatMap { response ->
|
||||
Promise.value(mapper.readTree(response.body.text))
|
||||
}).onError {
|
||||
log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl")
|
||||
}
|
||||
}.map { response ->
|
||||
mapper.readTree(response.body.text)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,16 +90,13 @@ class DBTManager implements Service {
|
||||
|
||||
private final List<String> queries = []
|
||||
|
||||
private final DBTManager manager
|
||||
|
||||
private final BlockTransaction transaction
|
||||
|
||||
private TransactionBuilder(DBTManager manager){
|
||||
this(manager, new BlockTransaction())
|
||||
private TransactionBuilder() {
|
||||
this.transaction = new BlockTransaction()
|
||||
}
|
||||
|
||||
private TransactionBuilder(DBTManager manager, BlockTransaction transaction) {
|
||||
this.manager = manager
|
||||
private TransactionBuilder(BlockTransaction transaction) {
|
||||
this.transaction = transaction
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import com.devsoap.dbt.DBTModule
|
||||
import com.devsoap.dbt.app.DatabaseService
|
||||
import com.devsoap.dbt.config.DBTConfig
|
||||
import com.devsoap.dbt.DBTManager
|
||||
import com.devsoap.dbt.handlers.ExecutorHandler
|
||||
import com.devsoap.dbt.handlers.LedgerHandler
|
||||
import com.devsoap.dbt.services.LedgerService
|
||||
|
||||
import com.devsoap.dbt.services.TransactionManagerService
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import org.h2.jdbcx.JdbcDataSource
|
||||
import ratpack.config.ConfigData
|
||||
import ratpack.session.SessionModule
|
||||
|
||||
import javax.sql.DataSource
|
||||
|
||||
@@ -14,41 +16,29 @@ import static ratpack.groovy.Groovy.ratpack
|
||||
ratpack {
|
||||
|
||||
serverConfig {
|
||||
|
||||
/**
|
||||
* DBT Framework config
|
||||
*/
|
||||
require("", DBTConfig)
|
||||
sysProps()
|
||||
require('/dbt', DBTConfig)
|
||||
}
|
||||
|
||||
bindings {
|
||||
bindInstance(new ObjectMapper())
|
||||
|
||||
module SessionModule
|
||||
module (DBTModule) {
|
||||
it.ledger.remoteUrl = 'http://localhost:5050/ledger'
|
||||
it.executor.remoteUrl = 'http://localhost:5050/executor'
|
||||
}
|
||||
|
||||
bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: ''))
|
||||
bindInstance(new DatabaseService())
|
||||
|
||||
|
||||
/**
|
||||
* DBT Framework manager
|
||||
*/
|
||||
bind(DBTManager)
|
||||
bind(LedgerService)
|
||||
bind(ExecutorHandler)
|
||||
bind(LedgerHandler)
|
||||
bind DatabaseService
|
||||
}
|
||||
|
||||
handlers {
|
||||
|
||||
/**
|
||||
* DBT Framework handlers
|
||||
*/
|
||||
path('executor', ExecutorHandler)
|
||||
path('ledger', LedgerHandler)
|
||||
|
||||
/**
|
||||
* Consumer services
|
||||
*/
|
||||
get('frontend') {
|
||||
get(DBTManager).execute { transaction ->
|
||||
get(TransactionManagerService).execute { transaction ->
|
||||
transaction.query("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (${new Random().nextInt()}, 'HELLO')")
|
||||
}.then {
|
||||
redirect("/gateway/${it['id'].textValue()}")
|
||||
@@ -56,7 +46,7 @@ ratpack {
|
||||
}
|
||||
|
||||
get('gateway/:transactionId?') {
|
||||
get(DBTManager).execute(pathTokens.transactionId, { transaction ->
|
||||
get(TransactionManagerService).execute(pathTokens.transactionId, { transaction ->
|
||||
transaction.query("INSERT INTO LOGS(LOG_ID,LOG_VALUE) VALUES (${new Random().nextInt()}, 'WORLD')")
|
||||
}).then {
|
||||
redirect("/gateway2/${it['id'].textValue()}")
|
||||
@@ -64,7 +54,7 @@ ratpack {
|
||||
}
|
||||
|
||||
get('gateway2/:transactionId?') {
|
||||
get(DBTManager).execute(pathTokens.transactionId, { transaction ->
|
||||
get(TransactionManagerService).execute(pathTokens.transactionId, { transaction ->
|
||||
transaction.query("SELECT * FROM LOGS")
|
||||
transaction.complete()
|
||||
}).then {
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.devsoap.dbt.framework
|
||||
|
||||
import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest
|
||||
import ratpack.impose.Impositions
|
||||
import ratpack.impose.ServerConfigImposition
|
||||
import ratpack.server.RatpackServer
|
||||
|
||||
class CustomPortMainApplicationUnderTest extends GroovyRatpackMainApplicationUnderTest {
|
||||
|
||||
private final int port
|
||||
|
||||
CustomPortMainApplicationUnderTest(int port) {
|
||||
this.port = port
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Impositions createImpositions() throws Exception {
|
||||
Impositions.of {
|
||||
it.add(ServerConfigImposition.of {
|
||||
it.port(port)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RatpackServer createServer() throws Exception {
|
||||
System.setProperty('ratpack.dbt.ledger.remoteUrl', "http://localhost:$port/ledger")
|
||||
System.setProperty('ratpack.dbt.executor.remoteUrl', "http://localhost:$port/executor")
|
||||
return super.createServer()
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.devsoap.dbt.framework
|
||||
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest
|
||||
import spock.lang.AutoCleanup
|
||||
import spock.lang.Specification
|
||||
|
||||
@@ -9,8 +9,26 @@ class ExecutorSpec extends Specification {
|
||||
|
||||
def mapper = new ObjectMapper()
|
||||
|
||||
def PATH = 'executor'
|
||||
|
||||
@AutoCleanup
|
||||
def aut = new GroovyRatpackMainApplicationUnderTest()
|
||||
def aut = new CustomPortMainApplicationUnderTest(8888)
|
||||
|
||||
void 'transaction sent to executor'() {
|
||||
setup:
|
||||
def transaction = new BlockTransaction()
|
||||
transaction.execute("SELECT * FROM LOGS")
|
||||
transaction.end()
|
||||
when:
|
||||
String json = aut.httpClient.requestSpec{ spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.postText(PATH)
|
||||
def recievedTransaction = mapper.readValue(json, BlockTransaction)
|
||||
then:
|
||||
recievedTransaction.id == transaction.id
|
||||
recievedTransaction.completed
|
||||
recievedTransaction.executed
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,32 +1,40 @@
|
||||
package com.devsoap.dbt.framework
|
||||
|
||||
import com.devsoap.dbt.BlockTransaction
|
||||
import com.devsoap.dbt.handlers.LedgerHandler
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import groovy.json.JsonSlurper
|
||||
import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest
|
||||
import ratpack.impose.Imposition
|
||||
import ratpack.impose.Impositions
|
||||
import ratpack.impose.ImpositionsSpec
|
||||
import ratpack.impose.ServerConfigImposition
|
||||
import ratpack.server.RatpackServer
|
||||
import spock.lang.AutoCleanup
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Specification
|
||||
|
||||
class LedgerSpec extends Specification {
|
||||
|
||||
def mapper = new ObjectMapper()
|
||||
def jsonSlurper = new JsonSlurper()
|
||||
|
||||
def PATH = 'ledger'
|
||||
|
||||
@AutoCleanup
|
||||
def aut = new GroovyRatpackMainApplicationUnderTest()
|
||||
GroovyRatpackMainApplicationUnderTest aut = new CustomPortMainApplicationUnderTest(8888)
|
||||
|
||||
void 'transaction sent to ledger'() {
|
||||
setup:
|
||||
def transaction = new BlockTransaction()
|
||||
transaction.execute("SELECT * FROM LOGS")
|
||||
when:
|
||||
def response = mapper.readValue(aut.httpClient.requestSpec { spec ->
|
||||
String json = aut.httpClient.requestSpec{ spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.post(LedgerHandler.PATH).body.text, BlockTransaction)
|
||||
}.postText(PATH)
|
||||
def recievedTransaction = mapper.readValue(json, BlockTransaction)
|
||||
then:
|
||||
response.id == transaction.id
|
||||
response.completed == false
|
||||
recievedTransaction.id == transaction.id
|
||||
!recievedTransaction.completed
|
||||
}
|
||||
|
||||
void 'completed transaction marked as completed'() {
|
||||
@@ -35,12 +43,13 @@ class LedgerSpec extends Specification {
|
||||
transaction.execute("SELECT * FROM LOGS")
|
||||
transaction.end()
|
||||
when:
|
||||
def response = mapper.readValue(aut.httpClient.requestSpec { spec ->
|
||||
String json = aut.httpClient.requestSpec{ spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.post(LedgerHandler.PATH).body.text, BlockTransaction)
|
||||
}.postText(PATH)
|
||||
def recievedTransaction = mapper.readValue(json, BlockTransaction)
|
||||
then:
|
||||
response.id == transaction.id
|
||||
response.completed == true
|
||||
recievedTransaction.id == transaction.id
|
||||
recievedTransaction.completed
|
||||
}
|
||||
|
||||
void 'completed transaction sent to executor from ledger'() {
|
||||
@@ -51,7 +60,7 @@ class LedgerSpec extends Specification {
|
||||
when:
|
||||
def response = mapper.readValue(aut.httpClient.requestSpec { spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.post(LedgerHandler.PATH).body.text, BlockTransaction)
|
||||
}.post(PATH).body.text, BlockTransaction)
|
||||
then:
|
||||
response.id == transaction.id
|
||||
response.executed == true
|
||||
@@ -66,13 +75,16 @@ class LedgerSpec extends Specification {
|
||||
transaction.execute("SELECT * FROM LOGS")
|
||||
transaction.end()
|
||||
when:
|
||||
def response = mapper.readValue(aut.httpClient.requestSpec { spec ->
|
||||
String json = aut.httpClient.requestSpec{ spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.post(LedgerHandler.PATH).body.text, BlockTransaction)
|
||||
def json = jsonSlurper.parseText(response.queries[1].result)
|
||||
}.postText(PATH)
|
||||
def recievedTransaction = mapper.readValue(json, BlockTransaction)
|
||||
def result = recievedTransaction.queries[1].result
|
||||
then:
|
||||
response.id == transaction.id
|
||||
json.LOG_ID.first() == 1
|
||||
json.LOG_VALUE.first() == 'HELLO'
|
||||
recievedTransaction.id == transaction.id
|
||||
recievedTransaction.queries.first().result == null // insert query has no result
|
||||
|
||||
result['LOG_ID'].first() == 1
|
||||
result['LOG_VALUE'].first() == 'HELLO'
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.devsoap.dbt.framework
|
||||
|
||||
import com.devsoap.dbt.data.BlockTransaction
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import ratpack.groovy.test.GroovyRatpackMainApplicationUnderTest
|
||||
import spock.lang.AutoCleanup
|
||||
import spock.lang.Specification
|
||||
|
||||
class TransactionManagementServiceSpec extends Specification {
|
||||
|
||||
|
||||
def mapper = new ObjectMapper()
|
||||
|
||||
def PATH = 'ledger'
|
||||
|
||||
@AutoCleanup
|
||||
GroovyRatpackMainApplicationUnderTest aut = new CustomPortMainApplicationUnderTest(8888)
|
||||
|
||||
void 'transaction sent to ledger'() {
|
||||
setup:
|
||||
def transaction = new BlockTransaction()
|
||||
transaction.execute("SELECT * FROM LOGS")
|
||||
when:
|
||||
String json = aut.httpClient.requestSpec{ spec ->
|
||||
spec.body.text(mapper.writeValueAsString(transaction))
|
||||
}.postText(PATH)
|
||||
def recievedTransaction = mapper.readValue(json, BlockTransaction)
|
||||
then:
|
||||
recievedTransaction.id == transaction.id
|
||||
!recievedTransaction.completed
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user