import com.mongodb.client.MongoCollection
import com.mongodb.client.model.BulkWriteOptions
import com.mongodb.client.model.InsertOneModel
import com.mongodb.client.model.WriteModel
import org.bson.Document
import groovy.json.*
def getCollection(String dataBaseName, String collectionName){
def mongoService = Client_Service.asControllerService(MongoDBClientService.class)
return mongoService.getDatabase(dataBaseName).getCollection(collectionName)
}
Main main = new Main()
def batchSize = Batch_Size.val
def bypass = Bypass_Validation.val
def collectionName = Mongo_Collection_Name.val
def databaseName = Mongo_DataBase_Name.val
def ordered = Ordered.val
def writeConcern = Write_Concern.val
int written = 0
def mongoCollection = getCollection(databaseName, collectionName)
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions()
bulkWriteOptions.ordered(ordered)
bulkWriteOptions.bypassDocumentValidation(bypass)
def json = '[{"val":123456789.976543213456},{"val":987654321.1234567898765},{"val":1234567}]'
try{
def jsonHandler = new JsonSlurper()
List<Map<String, Object>> docs = jsonHandler.parseText(json)
List<WriteModel<Document>> writeModels = new ArrayList<>()
WriteModel<Document> writeModel
docs.forEach(doc ->
{
writeModel = new InsertOneModel<>(new Document(doc))
writeModels.add(writeModel)
if (writeModels.size() == batchSize)
{
mongoCollection.bulkWrite(writeModels, bulkWriteOptions)
written += batchSize
writeModels = new ArrayList<>()
}
}
)
if (writeModels.size() > 0)
{
written += writeModels.size()
mongoCollection.bulkWrite(writeModels, bulkWriteOptions)
}
}catch(Exception e) {
getLogger().error("PutMongoRecord failed with error:", e);
session.transfer(flowFile, REL_FAILURE);
error = true;
}finally {
if (!error)
{
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Written {} records into MongoDB", new Object[]{ written });
}
}
标签:writeModels,bulkWriteOptions,val,import,test,new,def
From: https://www.cnblogs.com/INnoVationv2/p/16718608.html