1

Split project into sub-projects

This commit is contained in:
2018-05-04 17:23:08 +03:00
parent a3e6f119e7
commit fb4baec57f
28 changed files with 119 additions and 42 deletions

6
dbt-core/build.gradle Normal file
View File

@@ -0,0 +1,6 @@
dependencies {
compile ratpack.dependency('jdbc-tx')
compile ratpack.dependency('session')
}

View File

@@ -0,0 +1,62 @@
package com.devsoap.dbt
import com.devsoap.dbt.actions.ExecutorChainAction
import com.devsoap.dbt.actions.LedgerChainAction
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.LedgerData
import com.devsoap.dbt.handlers.ExecutorHandler
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
import com.devsoap.dbt.services.LedgerService
import com.devsoap.dbt.services.TransactionManagerService
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.inject.multibindings.Multibinder
import groovy.util.logging.Slf4j
import ratpack.guice.ConfigurableModule
import ratpack.handling.HandlerDecorator
import ratpack.server.ServerConfig
import ratpack.session.Session
@Slf4j
class DBTModule extends ConfigurableModule<DBTConfig> {
@Override
protected void configure() {
bind(ObjectMapper)
bind(LedgerChainAction)
bind(LedgerGetTransactionHandler)
bind(LedgerListTransactionsHandler)
bind(LedgerUpdateTransactionHandler)
bind(ExecutorChainAction)
bind(ExecutorHandler)
bind(LedgerData)
bind(LedgerService)
bind(TransactionManagerService)
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(LedgerChainAction))
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(ExecutorChainAction))
}
@Override
protected DBTConfig createConfig(ServerConfig serverConfig) {
(DBTConfig) serverConfig.getAsConfigObject('/dbt', DBTConfig)?.getObject() ?: super.createConfig(serverConfig)
}
@Override
protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) {
if(!config.executor.remoteUrl) {
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.executor.path}"
}
if(!config.ledger.remoteUrl) {
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.ledger.path}"
}
}
}

View File

@@ -0,0 +1,24 @@
package com.devsoap.dbt.actions
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.handlers.ExecutorHandler
import com.google.inject.Inject
import groovy.util.logging.Slf4j
import ratpack.groovy.handling.GroovyChainAction
@Slf4j
class ExecutorChainAction extends GroovyChainAction {
private final String executorPath
@Inject
ExecutorChainAction(DBTConfig config) {
executorPath = config.executor.path
}
@Override
void execute() throws Exception {
log.info("Registering executor at $executorPath")
path(executorPath, ExecutorHandler)
}
}

View File

@@ -0,0 +1,31 @@
package com.devsoap.dbt.actions
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
import com.google.inject.Inject
import groovy.util.logging.Slf4j
import ratpack.groovy.handling.GroovyChainAction
import ratpack.handling.Handlers
@Slf4j
class LedgerChainAction extends GroovyChainAction {
private final String ledgerPath
@Inject
LedgerChainAction(DBTConfig config) {
ledgerPath = config.ledger.path
}
@Override
void execute() throws Exception {
log.info("Registering ledger at $ledgerPath")
path(ledgerPath, Handlers.chain(
registry.get(LedgerGetTransactionHandler),
registry.get(LedgerUpdateTransactionHandler),
registry.get(LedgerListTransactionsHandler)
))
}
}

View File

@@ -0,0 +1,6 @@
package com.devsoap.dbt.config
class DBTConfig {
LedgerConfig ledger = new LedgerConfig()
ExecutorConfig executor = new ExecutorConfig()
}

View File

@@ -0,0 +1,6 @@
package com.devsoap.dbt.config
class ExecutorConfig {
String path = 'executor'
String remoteUrl
}

View File

@@ -0,0 +1,6 @@
package com.devsoap.dbt.config
class LedgerConfig {
String path = 'ledger'
String remoteUrl
}

View File

@@ -0,0 +1,82 @@
package com.devsoap.dbt.data
import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.databind.JsonNode
import groovy.transform.ToString
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
@ToString
class BlockTransaction implements Serializable {
String id = UUID.randomUUID().toString().replace('-','')
List<Query> queries = []
boolean completed = false
boolean executed = false
boolean rolledback = false
/**
* Executes a query in the transaction
*
* @param query
* the query to execute
* @return
* the result of the query
*/
void execute(String query) {
if(queries.isEmpty()){
queries << new Query(this, query)
} else {
queries << new Query(queries.last(), query)
}
}
/**
* End the current transaction
*
* @return
*/
void end() {
completed = true
}
// A block in the chain
@ToString
static final class Query implements Serializable {
String query
String id
String parent
long timeStamp
Map<String, List> result
Query() {
// For serialization
}
Query(Query previous, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = previous.id
id = generateHash()
}
Query(BlockTransaction transaction, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = transaction.id
id = generateHash()
}
final String generateHash() {
def digest = MessageDigest.getInstance('SHA-256')
def hash = digest.digest("${parent?:''}$timeStamp$query".getBytes(StandardCharsets.UTF_8))
hash.encodeHex().toString()
}
}
}

View File

@@ -0,0 +1,6 @@
package com.devsoap.dbt.data
class LedgerData implements Serializable {
List<BlockTransaction> transactions = new ArrayList<>()
}

View File

@@ -0,0 +1,126 @@
package com.devsoap.dbt.handlers
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.BlockTransaction
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ArrayNode
import groovy.util.logging.Slf4j
import ratpack.exec.Promise
import ratpack.handling.Context
import ratpack.handling.Handler
import ratpack.http.Status
import ratpack.http.client.HttpClient
import ratpack.jdbctx.Transaction
import javax.inject.Inject
import javax.sql.DataSource
import java.sql.ResultSet
@Slf4j
class ExecutorHandler implements Handler {
private final DBTConfig config
private final HttpClient client
private final ObjectMapper mapper
@Inject
ExecutorHandler(DBTConfig config, HttpClient client, ObjectMapper mapper) {
this.mapper = mapper
this.client = client
this.config = config
}
@Override
void handle(Context ctx) throws Exception {
ctx.request.body.then { body ->
def mapper = ctx.get(ObjectMapper)
def ds = ctx.get(DataSource)
def transaction = mapper.readValue(body.text, BlockTransaction)
if(!validateChain(transaction)) {
ctx.response.status(Status.of(400, 'Transaction chain invalid'))
return
}
executeCommands(ds, mapper, transaction).then {
transaction.executed = true
// Notify ledger of result
log.info("Updating ledger with execution result")
client.post(config.ledger.remoteUrl.toURI(), { spec ->
spec.body.text(mapper.writeValueAsString(transaction))
}).then {
if(it.status != Status.OK) {
log.error("Failed to update ledger with execution result for transaction $transaction.id")
}
}
// Return transaction with result
ctx.response.send(mapper.writeValueAsString(transaction))
}
}
}
private static boolean validateChain(BlockTransaction transaction) {
if(transaction.queries[0].parent != transaction.id) {
return false
}
for(int i=1; i<transaction.queries.size(); i++) {
def query = transaction.queries[i]
def prev = transaction.queries[i-1]
if(query.id != query.generateHash()) {
return false
}
if(query.parent != prev.generateHash()) {
return false
}
}
true
}
private static Promise<BlockTransaction> executeCommands(DataSource ds, ObjectMapper mapper, BlockTransaction transaction) {
def txDs = Transaction.dataSource(ds)
def tx = Transaction.create { ds.connection }
tx.wrap {
Promise.sync {
transaction.queries.each { block ->
log.info "Executing $block.query ..."
if(block.query.toLowerCase().startsWith("select")){
log.info('Saving result from Select query')
def result = txDs.connection
.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE)
.executeQuery(block.query)
block.result = toMap(result)
} else {
txDs.connection.createStatement().execute(block.query)
}
}
transaction
}
}
}
private static Map toMap(ResultSet resultSet) {
def map = [:]
if(resultSet.last()) {
resultSet.beforeFirst()
resultSet.metaData.columnCount.times { column ->
def columnIndex = column + 1
def columnName = resultSet.metaData.getColumnName(columnIndex)
def columnValues = map[columnName] as List
if(columnValues == null) {
map[columnName] = columnValues = []
}
resultSet.beforeFirst()
while(resultSet.next()) {
columnValues << resultSet.getObject(columnIndex)
}
}
}
map
}
}

View File

@@ -0,0 +1,34 @@
package com.devsoap.dbt.handlers
import com.devsoap.dbt.services.LedgerService
import groovy.util.logging.Log
import groovy.util.logging.Slf4j
import ratpack.handling.Context
import ratpack.handling.Handler
import ratpack.http.HttpMethod
import ratpack.jackson.Jackson
import ratpack.session.Session
@Slf4j
class LedgerGetTransactionHandler implements Handler {
@Override
void handle(Context ctx) throws Exception {
ctx.with {
if(request.method == HttpMethod.GET && header('X-Transaction-Id').present) {
def id = request.headers['X-Transaction-Id'].toString()
def ledgerService = ctx.get(LedgerService)
def session = ctx.get(Session)
ledgerService.fetchTransaction(session, id).then {
if(it.present) {
render(Jackson.json(it.get()))
} else {
notFound()
}
}
} else {
next()
}
}
}
}

View File

@@ -0,0 +1,29 @@
package com.devsoap.dbt.handlers
import com.devsoap.dbt.services.LedgerService
import groovy.util.logging.Slf4j
import ratpack.handling.Context
import ratpack.handling.Handler
import ratpack.http.HttpMethod
import ratpack.jackson.Jackson
import ratpack.session.Session
@Slf4j
class LedgerListTransactionsHandler implements Handler {
@Override
void handle(Context ctx) throws Exception {
ctx.with {
if(request.method == HttpMethod.GET && !header('X-Transaction-Id').present) {
def session = get(Session)
log.info("Listing transactions in session $session.id")
def ledgerService = get(LedgerService)
ledgerService.allTransactions(session).then {
render(Jackson.json(it))
}
} else {
next()
}
}
}
}

View File

@@ -0,0 +1,75 @@
package com.devsoap.dbt.handlers
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.BlockTransaction
import com.devsoap.dbt.data.LedgerData
import com.devsoap.dbt.services.LedgerService
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.util.logging.Log
import groovy.util.logging.Slf4j
import ratpack.config.ConfigData
import ratpack.handling.Context
import ratpack.handling.Handler
import ratpack.http.HttpMethod
import ratpack.jackson.Jackson
import ratpack.server.ServerConfig
import ratpack.session.Session
import javax.inject.Inject
@Slf4j
class LedgerUpdateTransactionHandler implements Handler {
private final DBTConfig config
@Inject
LedgerUpdateTransactionHandler(DBTConfig config) {
this.config = config
}
@Override
void handle(Context ctx) throws Exception {
ctx.with {
if(ctx.request.method == HttpMethod.POST) {
if(!config.executor.remoteUrl) {
throw new RuntimeException("Executor URL is not set, cannot update transaction")
}
def ledgerService = get(LedgerService)
def session = get(Session)
request.body.then { body ->
def mapper = get(ObjectMapper)
def transaction = mapper.readValue(body.text, BlockTransaction)
log.info("Recieved transaction $transaction.id")
ledgerService.fetchTransaction(session,transaction.id).then {
if(it.present) {
log.info "Transaction $transaction.id exists, updating transaction"
ledgerService.updateTransaction(session, transaction).then {
log.info("Transaction $it updated in ledger")
if(transaction.completed && !transaction.executed){
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
redirect(config.executor.remoteUrl)
} else {
render(Jackson.json(transaction))
}
}
} else {
log.info("Creating new transaction")
ledgerService.newTransaction(session, transaction).then {
log.info("Transaction $it added to ledger")
if(transaction.completed && !transaction.executed){
log.info("Sending transaction $transaction.id to executor at $config.executor.remoteUrl")
redirect(config.executor.remoteUrl)
} else {
render(Jackson.json(transaction))
}
}
}
}
}
} else {
next()
}
}
}
}

View File

@@ -0,0 +1,35 @@
package com.devsoap.dbt.services
import com.devsoap.dbt.data.BlockTransaction
import com.devsoap.dbt.data.LedgerData
import groovy.util.logging.Slf4j
import ratpack.exec.Promise
import ratpack.service.Service
import ratpack.session.Session
@Slf4j
class LedgerService implements Service {
private static final LedgerData data = new LedgerData()
Promise<Optional<BlockTransaction>> fetchTransaction(Session session, String transactionId) {
Promise.value(Optional.ofNullable(data.transactions.find {it.id == transactionId}))
}
Promise<List<BlockTransaction>> allTransactions(Session session) {
Promise.value(data.transactions)
}
Promise<String> newTransaction(Session session, BlockTransaction transaction) {
log.info("Adding new transaction $transaction.id to session ${session.id}")
data.transactions.add(transaction)
Promise.value(transaction.id)
}
Promise<String> updateTransaction(Session session, BlockTransaction transaction) {
log.info("Updating transaction $transaction.id in session ${session.id}")
data.transactions.removeAll {it.id == transaction.id}
data.transactions.add(transaction)
Promise.value(transaction.id)
}
}

View File

@@ -0,0 +1,125 @@
package com.devsoap.dbt.services
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.BlockTransaction
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.util.logging.Slf4j
import ratpack.exec.Promise
import ratpack.http.Status
import ratpack.http.client.HttpClient
import ratpack.service.Service
import javax.inject.Inject
@Slf4j
class TransactionManagerService implements Service {
private final HttpClient httpClient
private final ObjectMapper mapper
private final DBTConfig config
@Inject
TransactionManagerService(DBTConfig config, HttpClient httpClient, ObjectMapper mapper){
this.config = config
this.httpClient = httpClient
this.mapper = mapper
}
Promise<JsonNode> execute(ExecuteQuery queryBuilder) {
if(!config.ledger.remoteUrl) {
throw new RuntimeException("Ledger remote url is not set, cannot execute query")
}
def builder = new TransactionBuilder()
queryBuilder.build(builder)
def transaction = builder.build()
log.info("Sending transaction $transaction.id to ledger at $config.ledger.remoteUrl")
httpClient.post(config.ledger.remoteUrl.toURI(), { spec ->
spec.body.text(mapper.writeValueAsString(transaction))
}).onError {
log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl")
}.map { response ->
if(response.status == Status.OK) {
mapper.readTree(response.body.text)
}
}
}
Promise<JsonNode> execute(String transactionId, ExecuteQuery queryBuilder) {
if(!config.ledger.remoteUrl) {
throw new RuntimeException("Ledger remote url is not set, cannot execute query")
}
log.info("Sending transaction $transactionId to ledger at $config.ledger.remoteUrl")
httpClient.get(config.ledger.remoteUrl.toURI(), { spec ->
spec.headers.add('X-Transaction-Id', transactionId)
}).flatMap { response ->
if(response.status != Status.OK) {
throw new RuntimeException("Server returned ${response.statusCode} ${response.status.message}")
}
def oldTransaction = mapper.readValue(response.body.text, BlockTransaction)
if(oldTransaction == null) {
throw new RuntimeException("Transaction with id $transactionId could not be found")
}
if(oldTransaction.completed) {
throw new RuntimeException("Cannot modify a completed transaction")
}
log.info("Updating transaction $transactionId content with new query")
def builder = new TransactionBuilder(oldTransaction)
queryBuilder.build(builder)
def transaction = builder.build()
if(transaction.id != transactionId) {
throw new RuntimeException("Transaction id changed")
}
log.info("Sending updated transaction $transaction.id to ledger at $config.ledger.remoteUrl")
httpClient.post(config.ledger.remoteUrl.toURI(), { spec ->
spec.body.text(mapper.writeValueAsString(transaction))
}).onError {
log.error("Failed to send transaction $transaction.id to ledger $config.ledger.remoteUrl")
}
}.map { response ->
mapper.readTree(response.body.text)
}
}
class TransactionBuilder {
private final List<String> queries = []
private final BlockTransaction transaction
private TransactionBuilder() {
this.transaction = new BlockTransaction()
}
private TransactionBuilder(BlockTransaction transaction) {
this.transaction = transaction
}
void query(String sql){
queries << sql
}
String id() {
transaction.id
}
void complete() {
transaction.end()
}
private BlockTransaction build() {
queries.each { transaction.execute(it) }
transaction
}
}
@FunctionalInterface
interface ExecuteQuery {
void build(TransactionBuilder builder)
}
}