在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义:

object MongoSource {

  def apply(query: Observable[Document]): Source[Document, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query)) }

实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

  case class MGOContext(
dbName: String,
collName: String,
action: MGOCommands = null
) {...}
case class DocumentStream(filter: Option[Bson] = None,
andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
) extends MGOCommands

Source的具体实现:

    def mongoStream(ctx: MGOContext)(
implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
ctx.action match {
case DocumentStream(None, None) =>
MongoSource(coll.find())
case DocumentStream(Some(filter), None) =>
MongoSource(coll.find(filter))
case DocumentStream(None, Some(next)) =>
MongoSource(next(coll.find()))
case DocumentStream(Some(filter), Some(next)) =>
MongoSource(next(coll.find(filter)))
}
}

下面是mongoStream的使用示范:

  val clusterSettings = ClusterSettings.builder()
.hosts(List(new ServerAddress("localhost:27017")).asJava).build()
val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
implicit val client = MongoClient(clientSettings) implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher case class PO (
ponum: String,
podate: MGODate,
vendor: String,
remarks: Option[String],
podtl: Option[MGOArray]
)
def toPO(doc: Document): PO = {
PO(
ponum = doc.getString("ponum"),
podate = doc.getDate("podate"),
vendor = doc.getString("vendor"),
remarks = mgoGetStringOrNone(doc,"remarks"),
podtl = mgoGetArrayOrNone(doc,"podtl")
)
}
case class PODTL(
item: String,
price: Double,
qty: Int,
packing: Option[String],
payTerm: Option[String]
)
def toPODTL(podtl: Document): PODTL = {
PODTL(
item = podtl.getString("item"),
price = podtl.getDouble("price"),
qty = podtl.getInteger("qty"),
packing = mgoGetStringOrNone(podtl,"packing"),
payTerm = mgoGetStringOrNone(podtl,"payterm")
)
} def showPO(po: PO) = {
println(s"po number: ${po.ponum}")
println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
println(s"vendor: ${po.vendor}")
if (po.remarks != None)
println(s"remarks: ${po.remarks.get}")
po.podtl match {
case Some(barr) =>
mgoArrayToDocumentList(barr)
.map { dc => toPODTL(dc)}
.foreach { doc: PODTL =>
print(s"==>Item: ${doc.item} ")
print(s"price: ${doc.price} ")
print(s"qty: ${doc.qty} ")
doc.packing.foreach(pk => print(s"packing: ${pk} "))
doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
println("")
}
case _ =>
}
}
import org.mongodb.scala.model.Projections._
import MongoActionStream._
import MGOEngine._
import akka.stream.scaladsl.{Sink, Source} val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
val ctx = MGOContext("testdb","po").setCommand(
DocumentStream(filter = None, andThen = Some(proj))) val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))

我们看到:使用了许多代码去进行类型转换。不过也没有什么太好的办法,已经是一次性的了。我们也可以通过akka的Flow[A,B]来以stream里的A元素为变量对MongoDB数据进行更新操作:

 object MongoActionStream {

    import MGOContext._

    case class StreamingInsert[A](dbName: String,
collName: String,
converter: A => Document,
parallelism: Int =
) extends MGOCommands case class StreamingDelete[A](dbName: String,
collName: String,
toFilter: A => Bson,
parallelism: Int = ,
justOne: Boolean = false
) extends MGOCommands case class StreamingUpdate[A](dbName: String,
collName: String,
toFilter: A => Bson,
toUpdate: A => Bson,
parallelism: Int = ,
justOne: Boolean = false
) extends MGOCommands case class InsertAction[A](ctx: StreamingInsert[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
Flow[A].map(ctx.converter)
.mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
} case class UpdateAction[A](ctx: StreamingUpdate[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
} case class DeleteAction[A](ctx: StreamingDelete[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
} }

下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

  import jdbcengine._
import JDBCEngine._
import scalikejdbc._ case class DataRow (
rowid: Long,
measureid: Long,
state: String,
county: String,
year: Int,
value: Int
)
val toRow: WrappedResultSet => DataRow = rs => DataRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
state = rs.string("STATENAME"),
county = rs.string("COUNTYNAME"),
year = rs.int("REPORTYEAR"),
value = rs.int("VALUE")
) //construct the context
val h2ctx = JDBCQueryContext[DataRow](
dbName = 'h2,
statement = "select * from AQMRPT",
extractor = toRow
) //source from h2 database
val jdbcSource = jdbcAkkaStream(h2ctx) //document converter
def rowToDoc: DataRow => Document = row => Document (
"rowid" -> row.rowid,
"measureid" -> row.measureid,
"state" -> row.state,
"county" -> row.county,
"year" -> row.year,
"value" -> row.value
)
def docToRow: Document => DataRow = doc => DataRow (
rowid = doc.getLong("rowid"),
measureid = doc.getLong("measureid"),
state = doc.getString("state"),
county = doc.getString("county"),
year = doc.getInteger("year"),
value = doc.getInteger("value")
)
//setup context
val mgoctx = StreamingInsert("testdb","members",rowToDoc)
val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
val sink = Sink.foreach[DataRow]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
} //config jdbc drivers
ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings() val sts = jdbcSource.take().via(mgoActionFlow).to(sink).run() val mgoCtxShow = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None)) mongoStream(mgoCtxShow).map(docToRow).to(sink).run() import com.datastax.driver.core._
import cassandraengine._
import CQLEngine._
import org.mongodb.scala.model.Filters._ //data row converter
val cqlToDataRow = (rs: Row) => DataRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
state = rs.getString("STATENAME"),
county = rs.getString("COUNTYNAME"),
year = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE")
) import org.bson.conversions._
import org.mongodb.scala.model.Updates._ //#init-session
implicit val session = Cluster.builder
.addContactPoint("127.0.0.1")
.withPort()
.build
.connect() //setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
//construct source
val cqlSource = cassandraStream(cqlCtx) def toFilter: DataRow => Bson = row => {
and(equal("rowid",row.rowid), lt("value",))
}
def toUpdate: DataRow => Bson = row => {
set("value" , row.value * )
}
val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)
val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() import org.bson.conversions._
import org.mongodb.scala.model.Filters._
def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",)) val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
val mgoCtxSrc = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None))
mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() import org.mongodb.scala.model.Sorts._
val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
val mgoCtxShow = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None, andThen = Some(sortDsc))) mongoStream(mgoCtxShow).map(docToRow).to(sink).run()

下面是本次示范的全部源代码:

build.sbt

name := "learn-mongo"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
"org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17",
"com.typesafe.akka" %% "akka-actor" % "2.5.4",
"com.typesafe.akka" %% "akka-stream" % "2.5.4",
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)

resources/application.conf

# JDBC settings
test {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolInitialSize =
poolMaxSize =
poolConnectionTimeoutMillis =
poolValidationQuery = "select 1 as one"
poolFactoryName = "commons-dbcp2"
}
} db.mysql.driver = "com.mysql.cj.jdbc.Driver"
db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
db.mysql.user = "root"
db.mysql.password = ""
db.mysql.poolInitialSize =
db.mysql.poolMaxSize =
db.mysql.poolConnectionTimeoutMillis =
db.mysql.poolValidationQuery = "select 1 as one"
db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis =
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth =
}
dev {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolFactoryName = "hikaricp"
numThreads =
maxConnections =
minConnections =
keepAliveConnection = true
}
mysql {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/testdb"
user = "root"
password = ""
poolInitialSize =
poolMaxSize =
poolConnectionTimeoutMillis =
poolValidationQuery = "select 1 as one"
poolFactoryName = "bonecp" }
postgres {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/testdb"
user = "root"
password = ""
poolFactoryName = "hikaricp"
numThreads =
maxConnections =
minConnections =
keepAliveConnection = true
}
}
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis =
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth =
}

JDBCEngine.scala

package jdbcengine
import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import scala.util._
import java.time._
import scala.concurrent.duration._
import filestreaming.FileStreaming._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor
import java.io.InputStream object JDBCContext {
type SQLTYPE = Int
val SQL_EXEDDL=
val SQL_UPDATE =
val RETURN_GENERATED_KEYVALUE = true
val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M](
dbName: Symbol,
statement: String,
parameters: Seq[Any] = Nil,
fetchSize: Int = ,
autoCommit: Boolean = false,
queryTimeout: Option[Int] = None,
extractor: WrappedResultSet => M) case class JDBCContext(
dbName: Symbol,
statements: Seq[String] = Nil,
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = ,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
batch: Boolean = false,
returnGeneratedKey: Seq[Option[Any]] = Nil,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None) { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == )
ctx.copy(preAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
} def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == )
ctx.copy(postAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
} def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(Seq(_parameters))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
} def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(_parameters),
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some()) else Seq(None))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
} def appendBatchParameters(_parameters: Any*): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") var matchParams = true
if (ctx.parameters != Nil)
if (ctx.parameters.head.size != _parameters.size)
matchParams = false
if (matchParams) {
ctx.copy(
parameters = ctx.parameters ++ Seq(_parameters)
)
} else
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
} def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
ctx.copy(
returnGeneratedKey = if (returnKey) Seq(Some()) else Nil
)
} def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
sqlType = JDBCContext.SQL_EXEDDL,
batch = false
)
} def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some()) else Seq(None),
sqlType = JDBCContext.SQL_UPDATE,
batch = false
)
}
def setBatchCommand(_statement: String): JDBCContext = {
ctx.copy (
statements = Seq(_statement),
sqlType = JDBCContext.SQL_UPDATE,
batch = true
)
} type JDBCDate = LocalDate
type JDBCDateTime = LocalDateTime def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
def jdbcSetNow = LocalDateTime.now() type JDBCBlob = InputStream def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer) = FileToInputStream(fileName,timeOut) def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
implicit mat: Materializer) = InputStreamToFile(blob,fileName) } object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
throw new IllegalStateException(message)
} def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.iterator
.withDBSessionForceAdjuster(session => {
session.connection.setAutoCommit(ctx.autoCommit)
session.fetchSize(ctx.fetchSize)
})
}
Source.fromPublisher[A](publisher)
} def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
ctx: JDBCQueryContext[A])(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
rawSql.fetchSize(ctx.fetchSize)
implicit val session = NamedAutoSession(ctx.dbName)
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
sql.collection.apply[C]() } def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
if (ctx.sqlType != SQL_EXEDDL) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
}
else {
NamedDB(ctx.dbName) localTx { implicit session =>
Try {
ctx.statements.foreach { stm =>
val ddl = new SQLExecution(statement = stm, parameters = Nil)(
before = WrappedResultSet => {})(
after = WrappedResultSet => {}) ddl.apply()
}
"SQL_EXEDDL executed succesfully."
}
}
}
} def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (ctx.statements == Nil)
throw new IllegalStateException("JDBCContex setting error: statements empty!")
if (ctx.sqlType != SQL_UPDATE) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (ctx.batch) {
if (noReturnKey(ctx)) {
val usql = SQL(ctx.statements.head)
.tags(ctx.queryTags: _*)
.batch(ctx.parameters: _*)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[Seq]()
Seq.empty[Long].to[C]
}
}
} else {
val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[C]()
}
}
} } else {
Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
}
}
}
private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
val Some(key) :: xs = ctx.returnGeneratedKey
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result).to[C]
}
}
} private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val before = ctx.preAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
val after = ctx.postAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
Try {
NamedDB(ctx.dbName) localTx {implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result.toLong).to[C]
}
} } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (noReturnKey(ctx))
singleTxUpdateNoReturnKey(ctx)
else
singleTxUpdateWithReturnKey(ctx)
} private def noReturnKey(ctx: JDBCContext): Boolean = {
if (ctx.returnGeneratedKey != Nil) {
val k :: xs = ctx.returnGeneratedKey
k match {
case None => true
case Some(k) => false
}
} else true
} def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
case Nil => Seq.fill(ctx.statements.size)(None)
case k@_ => k
}
val sqlcmd = ctx.statements zip ctx.parameters zip keys
val results = sqlcmd.map { case ((stm, param), key) =>
key match {
case None =>
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
case Some(k) =>
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
}
}
results.to[C]
}
}
} def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (ctx.statements == Nil)
throw new IllegalStateException("JDBCContex setting error: statements empty!")
if (ctx.sqlType != SQL_UPDATE) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (!ctx.batch) {
if (ctx.statements.size == )
singleTxUpdate(ctx)
else
multiTxUpdates(ctx)
} else
Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) }
} case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = , processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Any]) {
jas =>
def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R) = {
import scala.concurrent._
val params = prepareParams(r)
NamedDB(dbName) autoCommit { session =>
session.execute(statement,params: _*)
}
Future.successful(r)
}
def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform) }
object JDBCActionStream {
def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
} }

CassandraEngine.scala

package cassandraengine
import com.datastax.driver.core._ import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._
import filestreaming.FileStreaming._ object CQLContext {
// Consistency Levels
type CONSISTENCY_LEVEL = Int
val ANY: CONSISTENCY_LEVEL = 0x0000
val ONE: CONSISTENCY_LEVEL = 0x0001
val TWO: CONSISTENCY_LEVEL = 0x0002
val THREE: CONSISTENCY_LEVEL = 0x0003
val QUORUM : CONSISTENCY_LEVEL = 0x0004
val ALL: CONSISTENCY_LEVEL = 0x0005
val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
consistency match {
case ALL => ConsistencyLevel.ALL
case ONE => ConsistencyLevel.ONE
case TWO => ConsistencyLevel.TWO
case THREE => ConsistencyLevel.THREE
case ANY => ConsistencyLevel.ANY
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
case QUORUM => ConsistencyLevel.QUORUM
case SERIAL => ConsistencyLevel.SERIAL
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL }
} }
case class CQLQueryContext[M](
statement: String,
extractor: Row => M,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
fetchSize: Int =
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
ctx.copy(consistency = Some(_consistency))
def setFetchSize(pageSize: Int): CQLQueryContext[M] =
ctx.copy(fetchSize = pageSize)
}
object CQLQueryContext {
def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
new CQLQueryContext[M](statement = stmt, extractor = converter)
} case class CQLContext(
statements: Seq[String],
parameters: Seq[Seq[Object]] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
ctx.copy(consistency = Some(_consistency))
def setCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
def appendCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = ctx.statements :+ _statement,
parameters = ctx.parameters ++ Seq(_parameters))
} object CQLEngine {
import CQLContext._
import CQLHelpers._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = )(
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind()
if (ctx.parameter != Nil) {
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
}
def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) {
(resultSet, None)
} else {
try {
val result = Await.result(resultSet.fetchMoreResults(), timeOut)
(result, Some((result.asScala.view.map(extractor)).to[C]))
} catch { case e: Throwable => (resultSet, None) }
}
def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
if (ctx.statements.size == )
cqlSingleUpdate(ctx)
else
cqlMultiUpdate(ctx)
}
def cqlSingleUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind()
if (ctx.parameters != Nil) {
val params = processParameters(ctx.parameters.head)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(boundStmt).map(_.wasApplied())
}
def cqlMultiUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
var batch = new BatchStatement()
commands.foreach { case (stm, params) =>
val prepStmt = session.prepare(stm)
if (params == Nil)
batch.add(prepStmt.bind())
else {
val p = processParameters(params)
batch.add(prepStmt.bind(p: _*))
}
}
ctx.consistency.foreach {consistency =>
batch.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(batch).map(_.wasApplied())
} def cassandraStream[A](ctx: CQLQueryContext[A])
(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement)
var boundStmt = prepStmt.bind()
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
} case class CassandraActionStream[R](parallelism: Int = , processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Object],
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
cas.copy(consistency = Some(_consistency)) private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
val prepStmt = session.prepare(statement)
var boundStmt = prepStmt.bind()
val params = processParameters(prepareParams(r))
boundStmt = prepStmt.bind(params:_*)
consistency.foreach { cons =>
boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
}
session.executeAsync(boundStmt).map(_ => r)
}
def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform) }
object CassandraActionStream {
def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params)
} }
object CQLHelpers {
import java.nio.ByteBuffer
import com.datastax.driver.core.LocalDate
import java.time.Instant
import akka.stream._
import scala.concurrent.duration._ implicit def listenableFutureToFuture[T](
listenableFuture: ListenableFuture[T]): Future[T] = {
val promise = Promise[T]()
Futures.addCallback(listenableFuture, new FutureCallback[T] {
def onFailure(error: Throwable): Unit = {
promise.failure(error)
()
}
def onSuccess(result: T): Unit = {
promise.success(result)
()
}
})
promise.future
} type CQLBlob = ByteBuffer
case class CQLDate(year: Int, month: Int, day: Int)
case object CQLTodayDate
case class CQLDateTime(year: Int, Month: Int,
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = )
case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = {
import java.time.{Clock,ZoneId}
params.map { obj =>
obj match {
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
case CQLTodayDate =>
val today = java.time.LocalDate.now()
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
case p@_ => p
}
}
} def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut) def cqlBlobToFile(blob: CQLBlob, fileName: String)(
implicit mat: Materializer) = ByteBufferToFile(blob,fileName)
}

MongoEngine.scala

import java.text.SimpleDateFormat

import akka.NotUsed
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.mongodb.scala.MongoClient
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import org.mongodb.scala._
import org.mongodb.scala.model._
import java.util.Calendar import scala.collection.JavaConverters._
import filestreaming.FileStreaming._
import akka.stream.Materializer
import org.mongodb.scala.bson.{BsonArray, BsonBinary} import scala.concurrent._
import scala.concurrent.duration._ object MGOContext { trait MGOCommands object MGOCommands { case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands /* org.mongodb.scala.FindObservable
import com.mongodb.async.client.FindIterable
val resultDocType = FindIterable[Document]
val resultOption = FindObservable(resultDocType)
.maxScan(...)
.limit(...)
.sort(...)
.project(...) */
case class Find[M](filter: Option[Bson] = None,
andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
converter: Option[Document => M] = None,
firstOnly: Boolean = false) extends MGOCommands case class DocumentStream(filter: Option[Bson] = None,
andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
) extends MGOCommands case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands } object MGOAdmins { case class DropCollection(collName: String) extends MGOCommands case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands case class ListCollection(dbName: String) extends MGOCommands case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands } case class MGOContext(
dbName: String,
collName: String,
action: MGOCommands = null
) {
ctx =>
def setDbName(name: String): MGOContext = ctx.copy(dbName = name) def setCollName(name: String): MGOContext = ctx.copy(collName = name) def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)
} object MGOContext {
def apply(db: String, coll: String) = new MGOContext(db, coll) def apply(db: String, coll: String, command: MGOCommands) =
new MGOContext(db, coll, command) } type MGODate = java.util.Date
def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
val ca = Calendar.getInstance()
ca.set(yyyy,mm,dd)
ca.getTime()
}
def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
val ca = Calendar.getInstance()
ca.set(yyyy,mm,dd,hr,min,sec)
ca.getTime()
}
def mgoDateTimeNow: MGODate = {
val ca = Calendar.getInstance()
ca.getTime
} def mgoDateToString(dt: MGODate, formatString: String): String = {
val fmt= new SimpleDateFormat(formatString)
fmt.format(dt)
} type MGOBlob = BsonBinary
type MGOArray = BsonArray def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer) = FileToByteArray(fileName,timeOut) def mgoBlobToFile(blob: MGOBlob, fileName: String)(
implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName) def mgoGetStringOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getString(fieldName))
else None
}
def mgoGetIntOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getInteger(fieldName))
else None
}
def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getLong(fieldName))
else None
}
def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getDouble(fieldName))
else None
}
def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getBoolean(fieldName))
else None
}
def mgoGetDateOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getDate(fieldName))
else None
}
def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
else None
}
def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
doc.get(fieldName).asInstanceOf[Option[MGOArray]]
else None
} def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
(arr.getValues.asScala.toList)
.asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
} type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
}
object MGOEngine { import MGOContext._
import MGOCommands._
import MGOAdmins._ def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
ctx.action match {
/* count */
case Count(Some(filter), Some(opt)) =>
coll.count(filter, opt.asInstanceOf[CountOptions])
.toFuture().asInstanceOf[Future[T]]
case Count(Some(filter), None) =>
coll.count(filter).toFuture()
.asInstanceOf[Future[T]]
case Count(None, None) =>
coll.count().toFuture()
.asInstanceOf[Future[T]]
/* distinct */
case Distict(field, Some(filter)) =>
coll.distinct(field, filter).toFuture()
.asInstanceOf[Future[T]]
case Distict(field, None) =>
coll.distinct((field)).toFuture()
.asInstanceOf[Future[T]]
/* find */
case Find(None, None, optConv, false) =>
if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(None, None, optConv, true) =>
if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
case Find(Some(filter), None, optConv, false) =>
if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), None, optConv, true) =>
if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
case Find(None, Some(next), optConv, _) =>
if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), Some(next), optConv, _) =>
if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
/* aggregate */
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
/* mapReduce */
case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
/* insert */
case Insert(docs, Some(opt)) =>
if (docs.size > ) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
.asInstanceOf[Future[T]]
else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
.asInstanceOf[Future[T]]
case Insert(docs, None) =>
if (docs.size > ) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
/* delete */
case Delete(filter, None, onlyOne) =>
if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
case Delete(filter, Some(opt), onlyOne) =>
if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
/* replace */
case Replace(filter, replacement, None) =>
coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
case Replace(filter, replacement, Some(opt)) =>
coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
/* update */
case Update(filter, update, None, onlyOne) =>
if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
case Update(filter, update, Some(opt), onlyOne) =>
if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
/* bulkWrite */
case BulkWrite(commands, None) =>
coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
case BulkWrite(commands, Some(opt)) =>
coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] /* drop collection */
case DropCollection(collName) =>
val coll = db.getCollection(collName)
coll.drop().toFuture().asInstanceOf[Future[T]]
/* create collection */
case CreateCollection(collName, None) =>
db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
case CreateCollection(collName, Some(opt)) =>
db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
/* list collection */
case ListCollection(dbName) =>
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
/* create view */
case CreateView(viewName, viewOn, pline, None) =>
db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
case CreateView(viewName, viewOn, pline, Some(opt)) =>
db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
/* create index */
case CreateIndex(key, None) =>
coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
case CreateIndex(key, Some(opt)) =>
coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
/* drop index */
case DropIndexByName(indexName, None) =>
coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
case DropIndexByName(indexName, Some(opt)) =>
coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
case DropIndexByKey(key, None) =>
coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
case DropIndexByKey(key, Some(opt)) =>
coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
case DropAllIndexes(None) =>
coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
case DropAllIndexes(Some(opt)) =>
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
}
} def mongoStream(ctx: MGOContext)(
implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
ctx.action match {
case DocumentStream(None, None) =>
MongoSource(coll.find())
case DocumentStream(Some(filter), None) =>
MongoSource(coll.find(filter))
case DocumentStream(None, Some(next)) =>
MongoSource(next(coll.find()))
case DocumentStream(Some(filter), Some(next)) =>
MongoSource(next(coll.find(filter)))
}
} } object MongoActionStream {
import MGOContext._ case class StreamingInsert[A](dbName: String,
collName: String,
converter: A => Document,
parallelism: Int =
) extends MGOCommands case class StreamingDelete[A](dbName: String,
collName: String,
toFilter: A => Bson,
parallelism: Int = ,
justOne: Boolean = false
) extends MGOCommands case class StreamingUpdate[A](dbName: String,
collName: String,
toFilter: A => Bson,
toUpdate: A => Bson,
parallelism: Int = ,
justOne: Boolean = false
) extends MGOCommands case class InsertAction[A](ctx: StreamingInsert[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
Flow[A].map(ctx.converter)
.mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
} case class UpdateAction[A](ctx: StreamingUpdate[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
} case class DeleteAction[A](ctx: StreamingDelete[A])(
implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
} }
import org.mongodb.scala._
import scala.concurrent._
import scala.concurrent.duration._ object MGOHelpers { implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
override val converter: (Document) => String = (doc) => doc.toJson
} implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
override val converter: (C) => String = (doc) => doc.toString
} trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: (C) => String def results(): Seq[C] = Await.result(observable.toFuture(), seconds)
def headResult() = Await.result(observable.head(), seconds)
def printResults(initial: String = ""): Unit = {
if (initial.length > ) print(initial)
results().foreach(res => println(converter(res)))
}
def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
} def getResult[T](fut: Future[T], timeOut: Duration = second): T = {
Await.result(fut,timeOut)
}
def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = second): Iterable[T] = {
Await.result(fut,timeOut)
} }

FileStreaming.scala

package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths import akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters} import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._ object FileStreaming {
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer):ByteBuffer = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toByteBuffer
} def FileToByteArray(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): Array[Byte] = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toArray
} def FileToInputStream(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): InputStream = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
val buf = (Await.result(fut, timeOut)).toArray
new ByteArrayInputStream(buf)
} def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
implicit mat: Materializer) = {
val ba = new Array[Byte](byteBuf.remaining())
byteBuf.get(ba,,ba.length)
val baInput = new ByteArrayInputStream(ba)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
implicit mat: Materializer) = {
val bb = ByteBuffer.wrap(bytes)
val baInput = new ByteArrayInputStream(bytes)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def InputStreamToFile(is: InputStream, fileName: String)(
implicit mat: Materializer) = {
val source = StreamConverters.fromInputStream(() => is)
source.runWith(FileIO.toPath(Paths.get(fileName)))
} }

FileStreaming.scala

package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths import akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters} import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._ object FileStreaming {
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer):ByteBuffer = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toByteBuffer
} def FileToByteArray(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): Array[Byte] = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toArray
} def FileToInputStream(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): InputStream = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
val buf = (Await.result(fut, timeOut)).toArray
new ByteArrayInputStream(buf)
} def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
implicit mat: Materializer) = {
val ba = new Array[Byte](byteBuf.remaining())
byteBuf.get(ba,,ba.length)
val baInput = new ByteArrayInputStream(ba)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
implicit mat: Materializer) = {
val bb = ByteBuffer.wrap(bytes)
val baInput = new ByteArrayInputStream(bytes)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def InputStreamToFile(is: InputStream, fileName: String)(
implicit mat: Materializer) = {
val source = StreamConverters.fromInputStream(() => is)
source.runWith(FileIO.toPath(Paths.get(fileName)))
} }

HikariCPool.scala

package jdbcengine
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
def getIntOr(path: String, default: => Int = ) = if(c.hasPath(path)) c.getInt(path) else default
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
def getDurationOr(path: String, default: => Duration = Duration.Zero) =
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties =
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = {
def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
val props = new Properties(null)
m.foreach { case (k, cv) =>
val v =
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
else if(cv.unwrapped eq null) null
else cv.unwrapped.toString
if(v ne null) props.put(k, v)
}
props
}
toProps(c.root.asScala)
} def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
def getStringOpt(path: String) = Option(getStringOr(path))
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
} object ConfigExtensionMethods {
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
} trait HikariConfigReader extends TypesafeConfigReader {
self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = {
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
} def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig()
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings
if (c.hasPath("dataSourceClass")) {
hconf.setDataSourceClassName(c.getString("dataSourceClass"))
} else {
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
}
hconf.setJdbcUrl(c.getStringOr("url", null))
c.getStringOpt("user").foreach(hconf.setUsername)
c.getStringOpt("password").foreach(hconf.setPassword)
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", ))
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", ))
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", ))
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", ))
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", ))
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
val numThreads = c.getIntOr("numThreads", )
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * ))
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
hconf.setPoolName(c.getStringOr("poolName", dbName.name))
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer
hconf.setReadOnly(c.getBooleanOr("readOnly", false))
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
hconf.setCatalog(c.getStringOr("catalog", null)) hconf }
} import scalikejdbc._
trait ConfigDBs {
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
getFactoryName(dbName) match {
case "hikaricp" => {
val hconf = hikariCPConfig(dbName)
val hikariCPSource = new HikariDataSource(hconf)
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
Class.forName(hconf.getDriverClassName)
}
ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
}
case _ => {
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
val cpSettings = readConnectionPoolSettings(dbName)
if (driver != null && driver.trim.nonEmpty) {
Class.forName(driver)
}
ConnectionPool.add(dbName, url, user, password, cpSettings)
}
}
} def setupAll(): Unit = {
loadGlobalSettings()
dbNames.foreach { dbName => setup(Symbol(dbName)) }
} def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
ConnectionPool.close(dbName)
} def closeAll(): Unit = {
ConnectionPool.closeAll
} } object ConfigDBs extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
with EnvPrefix { override val env = Option(envValue)

MongoStreamDemo.scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._
import org.mongodb.scala.connection._
import scala.collection.JavaConverters._ object MongoStream extends App {
import MGOContext._
import MGOEngine._
import MGOCommands._
import MGOHelpers._ val clusterSettings = ClusterSettings.builder()
.hosts(List(new ServerAddress("localhost:27017")).asJava).build()
val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
implicit val client = MongoClient(clientSettings) implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher case class PO (
ponum: String,
podate: MGODate,
vendor: String,
remarks: Option[String],
podtl: Option[MGOArray]
)
def toPO(doc: Document): PO = {
PO(
ponum = doc.getString("ponum"),
podate = doc.getDate("podate"),
vendor = doc.getString("vendor"),
remarks = mgoGetStringOrNone(doc,"remarks"),
podtl = mgoGetArrayOrNone(doc,"podtl")
)
}
case class PODTL(
item: String,
price: Double,
qty: Int,
packing: Option[String],
payTerm: Option[String]
)
def toPODTL(podtl: Document): PODTL = {
PODTL(
item = podtl.getString("item"),
price = podtl.getDouble("price"),
qty = podtl.getInteger("qty"),
packing = mgoGetStringOrNone(podtl,"packing"),
payTerm = mgoGetStringOrNone(podtl,"payterm")
)
} def showPO(po: PO) = {
println(s"po number: ${po.ponum}")
println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
println(s"vendor: ${po.vendor}")
if (po.remarks != None)
println(s"remarks: ${po.remarks.get}")
po.podtl match {
case Some(barr) =>
mgoArrayToDocumentList(barr)
.map { dc => toPODTL(dc)}
.foreach { doc: PODTL =>
print(s"==>Item: ${doc.item} ")
print(s"price: ${doc.price} ")
print(s"qty: ${doc.qty} ")
doc.packing.foreach(pk => print(s"packing: ${pk} "))
doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
println("")
}
case _ =>
}
}
import org.mongodb.scala.model.Projections._
import MongoActionStream._
import MGOEngine._
import akka.stream.scaladsl.{Sink, Source} val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
val ctx = MGOContext("testdb","po").setCommand(
DocumentStream(filter = None, andThen = Some(proj))) val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)))) import jdbcengine._
import JDBCEngine._
import scalikejdbc._ case class DataRow (
rowid: Long,
measureid: Long,
state: String,
county: String,
year: Int,
value: Int
)
val toRow: WrappedResultSet => DataRow = rs => DataRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
state = rs.string("STATENAME"),
county = rs.string("COUNTYNAME"),
year = rs.int("REPORTYEAR"),
value = rs.int("VALUE")
) //construct the context
val h2ctx = JDBCQueryContext[DataRow](
dbName = 'h2,
statement = "select * from AQMRPT",
extractor = toRow
) //source from h2 database
val jdbcSource = jdbcAkkaStream(h2ctx) //document converter
def rowToDoc: DataRow => Document = row => Document (
"rowid" -> row.rowid,
"measureid" -> row.measureid,
"state" -> row.state,
"county" -> row.county,
"year" -> row.year,
"value" -> row.value
)
def docToRow: Document => DataRow = doc => DataRow (
rowid = doc.getLong("rowid"),
measureid = doc.getLong("measureid"),
state = doc.getString("state"),
county = doc.getString("county"),
year = doc.getInteger("year"),
value = doc.getInteger("value")
)
//setup context
val mgoctx = StreamingInsert("testdb","members",rowToDoc)
val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
val sink = Sink.foreach[DataRow]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
} //config jdbc drivers
ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings() val sts = jdbcSource.take().via(mgoActionFlow).to(sink).run() val mgoCtxPrint = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None)) mongoStream(mgoCtxPrint).map(docToRow).to(sink).run() import com.datastax.driver.core._
import cassandraengine._
import CQLEngine._
import org.mongodb.scala.model.Filters._ //data row converter
val cqlToDataRow = (rs: Row) => DataRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
state = rs.getString("STATENAME"),
county = rs.getString("COUNTYNAME"),
year = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE")
) import org.bson.conversions._
import org.mongodb.scala.model.Updates._ //#init-session
implicit val session = Cluster.builder
.addContactPoint("127.0.0.1")
.withPort()
.build
.connect() //setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
//construct source
val cqlSource = cassandraStream(cqlCtx) def toFilter: DataRow => Bson = row => {
and(equal("rowid",row.rowid), lt("value",))
}
def toUpdate: DataRow => Bson = row => {
set("value" , row.value * )
}
val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)
cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() import org.bson.conversions._
import org.mongodb.scala.model.Filters._
def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",)) val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
val mgoCtxSrc = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None))
mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() import org.mongodb.scala.model.Sorts._
val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
val mgoCtxShow = MGOContext("testdb","members").setCommand(
DocumentStream(filter = None, andThen = Some(sortDsc))) mongoStream(mgoCtxShow).map(docToRow).to(sink).run() scala.io.StdIn.readLine() system.terminate() }

SDP(12): MongoDB-Engine - Streaming的更多相关文章

  1. Windows Phone开发(12):认识一下独具个性的磁贴

    原文:Windows Phone开发(12):认识一下独具个性的磁贴 对"磁贴"的理解是一点也不抽象的,为什么呢?只要你愿意启动WP系统,无论你是在模拟器中还是在真机中,是的,桌面 ...

  2. Qt 学习之路 2(12):菜单栏、工具栏和状态栏

    Home / Qt 学习之路 2 / Qt 学习之路 2(12):菜单栏.工具栏和状态栏 Qt 学习之路 2(12):菜单栏.工具栏和状态栏  豆子  2012年9月10日  Qt 学习之路 2  2 ...

  3. MongoDB学习笔记(1):MongoDB的安装和说明

    MongoDB学习笔记(1):MongoDB的安装和说明 快速开始 下载地址 官网下载: https://www.mongodb.com/download-center?jmp=nav#communi ...

  4. 小白学 Python(12):基础数据结构(字典)(上)

    人生苦短,我选Python 前文传送门 小白学 Python(1):开篇 小白学 Python(2):基础数据类型(上) 小白学 Python(3):基础数据类型(下) 小白学 Python(4):变 ...

  5. MongoDB和Java(7):MongoDB用户管理

    最近花了一些时间学习了下MongoDB数据库,感觉还是比较全面系统的,涉及了软件安装.客户端操作.安全认证.副本集和分布式集群搭建,以及使用Spring Data连接MongoDB进行数据操作,收获很 ...

  6. SDP(8):文本式数据库-MongoDB-Scala基本操作

    MongoDB是一种文本式数据库.与传统的关系式数据库最大不同是MongoDB没有标准的格式要求,即没有schema,合适高效处理当今由互联网+商业产生的多元多态数据.MongoDB也是一种分布式数据 ...

  7. mongoDB系列之(二):mongoDB 副本集

    1. 什么是副本集 副本集就是mongoDB副本所组成的一个集群. 同期原理是,写操作发生在主库,从库同步主库的OpLog日志. 集群中没有特定的主库,主库是选举产生,如果主库down了,会再选举出一 ...

  8. Linux(12):期中架构(4)--- 前端部分:HTTP &amp; Nginx &amp; LNMP架构

    HTTP协议概念原理说明 1. 当用户访问一个网站时经历的过程 # ①. 利用DNS服务,将输入的域名解析为相应的IP地址 a 本地主机输入域名后,会查询本地缓存信息和本地hosts b 本地主机会向 ...

  9. Netruon 理解(12):使用 Linux bridge 将 Linux network namespace 连接外网

    学习 Neutron 系列文章: (1)Neutron 所实现的虚拟化网络 (2)Neutron OpenvSwitch + VLAN 虚拟网络 (3)Neutron OpenvSwitch + GR ...

随机推荐

  1. 3-udev

    Udev 这个是挂载上的u盘 拔掉再插上 查看u盘设备信息 拔掉再插上,显示了 来自为知笔记(Wiz) 附件列表

  2. FILE不是C语言关键字

    FILE不是C语言关键字,只是标准C中的标准输入输出中定义的一个新的数据类型 stdio.htypedef struct _iobuf{ char* _ptr; int _cnt; char* _ba ...

  3. FastJson转换自定义枚举类

    在项目中有些状态需要采用枚举类型,在数据库中保存的是name(英文),而前台需要显示的是text(中文). 所以这就需要自己去实现序列. 例如对象: import java.util.Date; im ...

  4. linux终端-console

    echo $TERM 在ssh上是xterm 在console上是linux setterm -foreground black -background white -store http://blo ...

  5. UNIX网络编程——套接字选项

    http://www.educity.cn/linux/1241288.html 有时候我们需要控制套接字的行为(如修改缓冲区的大小),这个时候我们就要学习套接字选项. int getsockopt( ...

  6. 一条sql语句循环插入N条不同记录(转)

    SET NOCOUNT ON IF (OBJECT_ID('TB' ) IS NOT NULL ) DROP TABLE TB GO CREATE TABLE TB(ID INT IDENTITY ( ...

  7. sax(用于处理XML事件驱动的推模型)解析例子

    SAX解析 Java程序如下: import org.xml.sax.helpers.DefaultHandler; import javax.xml.parsers.SAXParser; impor ...

  8. 【FM】算法

    https://www.cnblogs.com/AndyJee/p/7879765.html

  9. JavaScript 获取鼠标点击位置坐标

    在一些DOM操作中我们经常会跟元素的位置打交道,鼠标交互式一个经常用到的方面,令人失望的是不同的浏览器下会有不同的结果甚至是有的浏览器下没结果,这篇文章就上鼠标点击位置坐标获取做一些简单的总结,没特殊 ...

  10. 13 款惊艳的 Node.js 框架——第1部分

    [编者按]本文作者为 Peter Wayner,主要介绍13款至精至简的 Node.js 框架,帮助你简化高速网站.丰富 API 以及实时应用的开发流程.本文系国内 ITOM 管理平台 OneAPM ...