1

Replace in-memory ledger with MongoDB

This commit is contained in:
2018-06-09 20:45:18 +03:00
parent 0e21b7eee2
commit 0f24c37ac5
17 changed files with 209 additions and 174 deletions

View File

@@ -1,4 +1,6 @@
dependencies { dependencies {
compile ratpack.dependency('jdbc-tx') compile ratpack.dependency('jdbc-tx')
//compile 'org.mongodb:mongo-java-driver:3.7.1'
compile 'com.gmongo:gmongo:1.2'
compile 'com.fasterxml.jackson.module:jackson-module-jsonSchema:2.9.0' compile 'com.fasterxml.jackson.module:jackson-module-jsonSchema:2.9.0'
} }

View File

@@ -1,80 +0,0 @@
/*
* Copyright 2018 Devsoap Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.devsoap.dbt
import com.devsoap.dbt.actions.ExecutorChainAction
import com.devsoap.dbt.actions.LedgerChainAction
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.LedgerData
import com.devsoap.dbt.handlers.ExecutorHandler
import com.devsoap.dbt.handlers.ConfigInfoHandler
import com.devsoap.dbt.handlers.JsonSchemaHandler
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
import com.devsoap.dbt.services.LedgerService
import com.devsoap.dbt.services.TransactionManagerService
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.inject.multibindings.Multibinder
import groovy.util.logging.Slf4j
import ratpack.guice.ConfigurableModule
import ratpack.handling.HandlerDecorator
import ratpack.server.ServerConfig
@Slf4j
class DBTModule extends ConfigurableModule<DBTConfig> {
@Override
protected void configure() {
bind(ObjectMapper)
bind(LedgerChainAction)
bind(LedgerGetTransactionHandler)
bind(LedgerListTransactionsHandler)
bind(LedgerUpdateTransactionHandler)
bind(ExecutorChainAction)
bind(ExecutorHandler)
bind(LedgerData)
bind(LedgerService)
bind(TransactionManagerService)
bind(ConfigInfoHandler)
bind(JsonSchemaHandler)
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(LedgerChainAction))
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(ExecutorChainAction))
}
@Override
protected DBTConfig createConfig(ServerConfig serverConfig) {
(DBTConfig) serverConfig.getAsConfigObject('/dbt', DBTConfig)?.getObject() ?: super.createConfig(serverConfig)
}
@Override
protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) {
if(!config.executor.remoteUrl) {
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.executor.path}"
}
if(!config.ledger.remoteUrl) {
config.ledger.remoteUrl = "http://localhost:${serverConfig.port}/${config.ledger.path}"
}
}
}

View File

@@ -31,4 +31,9 @@ class LedgerConfig {
* If the ledger is disabled, then this specifies the remote url of the ledger the executors use * If the ledger is disabled, then this specifies the remote url of the ledger the executors use
*/ */
String remoteUrl String remoteUrl
/**
* Database JDBC Url
*/
String databaseUrl
} }

View File

@@ -59,42 +59,4 @@ class BlockTransaction implements Serializable {
void commit() { void commit() {
completed = true completed = true
} }
// A block in the chain
@ToString
static final class Query implements Serializable {
@JsonProperty(required = true)
String query
String id
String parent
long timeStamp
Map<String, List> result
String resultError
Query() {
// For serialization
}
Query(Query previous, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = previous.id
id = generateHash()
}
Query(BlockTransaction transaction, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = transaction.id
id = generateHash()
}
final String generateHash() {
def digest = MessageDigest.getInstance('SHA-256')
def hash = digest.digest("${parent?:''}$timeStamp$query".getBytes(StandardCharsets.UTF_8))
hash.encodeHex().toString()
}
}
} }

View File

@@ -1,21 +0,0 @@
/*
* Copyright 2018 Devsoap Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.devsoap.dbt.data
class LedgerData implements Serializable {
List<BlockTransaction> transactions = new ArrayList<>()
}

View File

@@ -0,0 +1,46 @@
package com.devsoap.dbt.data
import com.fasterxml.jackson.annotation.JsonProperty
import groovy.transform.ToString
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
@ToString
class Query implements Serializable {
@JsonProperty(required = true)
String query
String id
String parent
long timeStamp
// Column -> Values
Map<String, List<String>> result
String resultError
Query() {
// For serialization
}
Query(Query previous, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = previous.id
id = generateHash()
}
Query(BlockTransaction transaction, String query) {
this.query = query
timeStamp = new Date().getTime()
parent = transaction.id
id = generateHash()
}
final String generateHash() {
def digest = MessageDigest.getInstance('SHA-256')
def hash = digest.digest("${parent?:''}$timeStamp$query".getBytes(StandardCharsets.UTF_8))
hash.encodeHex().toString()
}
}

View File

@@ -140,7 +140,7 @@ class ExecutorHandler implements Handler {
} }
} }
private static Map toMap(ResultSet resultSet) { private static Map<String,List<String>> toMap(ResultSet resultSet) {
def map = [:] def map = [:]
if(resultSet.last()) { if(resultSet.last()) {
@@ -156,7 +156,7 @@ class ExecutorHandler implements Handler {
resultSet.beforeFirst() resultSet.beforeFirst()
while(resultSet.next()) { while(resultSet.next()) {
columnValues << resultSet.getObject(columnIndex) columnValues << resultSet.getObject(columnIndex).toString()
} }
} }
} }

View File

@@ -17,6 +17,7 @@ package com.devsoap.dbt.handlers
import com.devsoap.dbt.config.DBTConfig import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.BlockTransaction import com.devsoap.dbt.data.BlockTransaction
import com.devsoap.dbt.data.Query
import com.devsoap.dbt.services.LedgerService import com.devsoap.dbt.services.LedgerService
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import groovy.util.logging.Slf4j import groovy.util.logging.Slf4j
@@ -128,8 +129,8 @@ class LedgerUpdateTransactionHandler implements Handler {
newTransaction.queries.each { q -> newTransaction.queries.each { q ->
def query = transaction.queries.isEmpty() ? def query = transaction.queries.isEmpty() ?
new BlockTransaction.Query(transaction, q.query) : new Query(transaction, q.query) :
new BlockTransaction.Query(transaction.queries.last(), q.query) new Query(transaction.queries.last(), q.query)
query.resultError = q.resultError query.resultError = q.resultError
query.result = q.result query.result = q.result
transaction.queries << query transaction.queries << query

View File

@@ -0,0 +1,30 @@
package com.devsoap.dbt.modules
import com.devsoap.dbt.actions.ExecutorChainAction
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.handlers.ExecutorHandler
import com.google.inject.multibindings.Multibinder
import ratpack.handling.HandlerDecorator
import ratpack.server.ServerConfig
class DBTExecutorModule extends DBTModule {
@Override
protected void configure() {
super.configure()
bind(ExecutorChainAction)
bind(ExecutorHandler)
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(ExecutorChainAction))
}
@Override
protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) {
config.executor.enabled = !config.executor.remoteUrl
if(!config.executor.remoteUrl) {
config.executor.remoteUrl = "http://localhost:${serverConfig.port}/${config.executor.path}"
}
}
}

View File

@@ -0,0 +1,36 @@
package com.devsoap.dbt.modules
import com.devsoap.dbt.actions.LedgerChainAction
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.handlers.LedgerGetTransactionHandler
import com.devsoap.dbt.handlers.LedgerListTransactionsHandler
import com.devsoap.dbt.handlers.LedgerUpdateTransactionHandler
import com.devsoap.dbt.services.LedgerService
import com.google.inject.multibindings.Multibinder
import ratpack.handling.HandlerDecorator
import ratpack.server.ServerConfig
class DBTLedgerModule extends DBTModule {
@Override
protected void configure() {
super.configure()
bind(LedgerChainAction)
bind(LedgerGetTransactionHandler)
bind(LedgerListTransactionsHandler)
bind(LedgerUpdateTransactionHandler)
bind(LedgerService)
Multibinder.newSetBinder(binder(), HandlerDecorator).addBinding()
.toInstance(HandlerDecorator.prependHandlers(LedgerChainAction))
}
@Override
protected void defaultConfig(ServerConfig serverConfig, DBTConfig config) {
config.ledger.enabled = !config.ledger.remoteUrl
if(!config.ledger.remoteUrl) {
config.ledger.remoteUrl = "http://localhost:${serverConfig.port}/${config.ledger.path}"
}
}
}

View File

@@ -15,35 +15,72 @@
*/ */
package com.devsoap.dbt.services package com.devsoap.dbt.services
import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.data.BlockTransaction import com.devsoap.dbt.data.BlockTransaction
import com.devsoap.dbt.data.LedgerData import com.fasterxml.jackson.databind.ObjectMapper
import com.gmongo.GMongo
import com.mongodb.BasicDBObject
import com.mongodb.BasicDBObjectBuilder
import com.mongodb.DB
import com.mongodb.DBCollection
import com.mongodb.DBCursor
import com.mongodb.DBObject
import com.mongodb.MongoURI
import com.mongodb.util.JSON
import groovy.util.logging.Slf4j import groovy.util.logging.Slf4j
import ratpack.exec.Promise import ratpack.exec.Promise
import ratpack.service.Service import ratpack.service.Service
import javax.inject.Inject
@Slf4j @Slf4j
class LedgerService implements Service { class LedgerService implements Service {
private static final LedgerData data = new LedgerData() private final String dbUrl
private DB db
private final ObjectMapper mapper
@Inject
LedgerService(DBTConfig config, ObjectMapper mapper) {
dbUrl = config.ledger.databaseUrl
this.mapper = mapper
}
Promise<Optional<BlockTransaction>> fetchTransaction(String transactionId) { Promise<Optional<BlockTransaction>> fetchTransaction(String transactionId) {
Promise.value(Optional.ofNullable(data.transactions.find {it.id == transactionId})) BlockTransaction transaction = transactions.findOne(['id':transactionId])?.findAll { it.key != '_id' } as BlockTransaction
Promise.value(Optional.ofNullable(transaction))
} }
Promise<List<BlockTransaction>> allTransactions() { Promise<List<BlockTransaction>> allTransactions() {
Promise.value(data.transactions) def cursor = transactions.find()
log.info("Found ${cursor.size()} transactions")
Promise.value(cursor.collect {it.findAll { it.key != '_id' } as BlockTransaction})
} }
Promise<String> newTransaction(BlockTransaction transaction) { Promise<String> newTransaction(BlockTransaction transaction) {
log.info("Adding new transaction $transaction.id") log.info("Adding new transaction $transaction.id")
data.transactions.add(transaction) transactions << JSON.parse(mapper.writeValueAsString(transaction))
Promise.value(transaction.id) Promise.value(transaction.id)
} }
Promise<String> updateTransaction(BlockTransaction transaction) { Promise<String> updateTransaction(BlockTransaction transaction) {
log.info("Updating transaction $transaction.id") log.info("Updating transaction $transaction.id")
data.transactions.removeAll {it.id == transaction.id} transactions.findAndModify(
data.transactions.add(transaction) ['id':transaction.id] as BasicDBObject,
JSON.parse(mapper.writeValueAsString(transaction)) as DBObject
)
Promise.value(transaction.id) Promise.value(transaction.id)
} }
private DBCollection getTransactions() {
database.getCollection('transactions')
}
private DB getDatabase() {
if(!db) {
db = new GMongo(new MongoURI(dbUrl)).getDB('dbt')
}
db
}
} }

View File

@@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
import com.devsoap.dbt.DBTModule
import com.devsoap.dbt.config.DBTConfig import com.devsoap.dbt.config.DBTConfig
import com.devsoap.dbt.modules.DBTExecutorModule
import com.devsoap.dbt.modules.DBTLedgerModule
import org.flywaydb.core.Flyway import org.flywaydb.core.Flyway
import org.h2.jdbcx.JdbcDataSource import org.h2.jdbcx.JdbcDataSource
import ratpack.service.Service import ratpack.service.Service
@@ -33,11 +35,14 @@ ratpack {
bindings { bindings {
module (DBTModule) { module (DBTLedgerModule) {
it.ledger.remoteUrl = 'http://localhost:8888/ledger'
it.executor.remoteUrl = 'http://localhost:8888/executor' it.executor.remoteUrl = 'http://localhost:8888/executor'
} }
module (DBTExecutorModule) {
it.ledger.remoteUrl = 'http://localhost:8888/ledger'
}
bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: '')) bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: ''))
bind FlywayMigrationService bind FlywayMigrationService
} }

View File

@@ -2,6 +2,22 @@ version: '2'
services: services:
ledger:
image: com.devsoap/dbt-ledger
container_name: dbt-ledger
environment:
- RATPACK_DBT__EXECUTOR__REMOTE_URL=http://executor:5050/executor
- RATPACK_DBT__LEDGER__DATABASE_URL=mongodb://ledger-db/dbt
- RATPACK_DEVELOPMENT=false
ports:
- "5050:5050"
depends_on:
- ledger-db
networks:
dbt:
aliases:
- ledger
executor: executor:
image: com.devsoap/dbt-executor image: com.devsoap/dbt-executor
container_name: dbt-executor container_name: dbt-executor
@@ -13,18 +29,17 @@ services:
aliases: aliases:
- executor - executor
ledger: ledger-db:
image: com.devsoap/dbt-ledger image: mongo:latest
container_name: dbt-ledger container_name: dbt-ledger-db
environment:
- RATPACK_DBT__EXECUTOR__REMOTE_URL=http://executor:5050/executor
- RATPACK_DEVELOPMENT=false
ports: ports:
- "5050:5050" - "27017:27017"
environment:
- MONGO_INITDB_DATABASE=dbt
networks: networks:
dbt: dbt:
aliases: aliases:
- ledger - ledger-db
networks: networks:
dbt: dbt:

View File

@@ -11,7 +11,6 @@ dependencies {
run { run {
environment('RATPACK_DBT__LEDGER__REMOTE_URL', findProperty('ledgerURL').toString() ?: "http://localhost:5050/ledger") environment('RATPACK_DBT__LEDGER__REMOTE_URL', findProperty('ledgerURL').toString() ?: "http://localhost:5050/ledger")
environment('RATPACK_DBT__LEDGER__ENABLED', false)
} }
distDocker { distDocker {

View File

@@ -1,4 +1,4 @@
import com.devsoap.dbt.DBTModule import com.devsoap.dbt.modules.DBTExecutorModule
import org.h2.jdbcx.JdbcDataSource import org.h2.jdbcx.JdbcDataSource
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@@ -16,7 +16,7 @@ ratpack {
bindings { bindings {
bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: '')) bindInstance(DataSource, new JdbcDataSource(url: 'jdbc:h2:mem:dbtdb;DB_CLOSE_DELAY=-1', user: ''))
module (DBTModule) { config -> module (DBTExecutorModule) { config ->
log.info "Executor available at $config.executor.remoteUrl" log.info "Executor available at $config.executor.remoteUrl"
log.info "Ledger available at $config.ledger.remoteUrl" log.info "Ledger available at $config.ledger.remoteUrl"
} }

View File

@@ -11,7 +11,6 @@ dependencies {
run { run {
environment('RATPACK_DBT__EXECUTOR__REMOTE_URL', findProperty('executorURL').toString() ?: "http://localhost:5050/executor") environment('RATPACK_DBT__EXECUTOR__REMOTE_URL', findProperty('executorURL').toString() ?: "http://localhost:5050/executor")
environment('RATPACK_DBT__EXECUTOR__ENABLED', false)
} }
distDocker { distDocker {

View File

@@ -1,9 +1,8 @@
import com.devsoap.dbt.DBTModule import com.devsoap.dbt.modules.DBTLedgerModule
import org.h2.jdbcx.JdbcDataSource import com.mongodb.MongoClient
import com.mongodb.MongoClientURI
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import javax.sql.DataSource
import static ratpack.groovy.Groovy.ratpack import static ratpack.groovy.Groovy.ratpack
def log = LoggerFactory.getLogger('dbt-ledger') def log = LoggerFactory.getLogger('dbt-ledger')
@@ -15,9 +14,9 @@ ratpack {
} }
bindings { bindings {
module (DBTModule) { config -> module (DBTLedgerModule) { config ->
log.info "Executor available at $config.executor.remoteUrl" log.info("Using Mongo database at $config.ledger.databaseUrl")
log.info "Ledger available at $config.ledger.remoteUrl" bindInstance(MongoClient, new MongoClient(new MongoClientURI(config.ledger.databaseUrl)))
} }
} }
} }