第一步、开发环境:
win7 64位(注:MongoDb在32位windows上有数量限制(2G),详见官方文档)
Mongodb3.2
mongofb_java_driver 3.2.2
第二部、安装mongodb,并开启服务
略:可参见官方文档
第三部、代码
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.LinkedList; import java.util.List; import org.bson.Document; import com.mongodb.MongoClient; import com.mongodb.MongoWriteException; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase;/*** created by soarhu 2016/4/21*/public class MongodbBatchInsetUtils {static final int ThreadNum=3;//设置向MongoDb中插入数据的线程数static int ThreadSizeCount = 0;//用于计算子线程完成数static final String HOST = "127.0.0.1";//主机static final int PORT = 27017;//端口 static final String DATABASE_NAME="mydb";//存储数据库名称,如果不存在会自动创建数据库static final String COLLECTION_NAME="md";//存储Collectionpublic static final String DIR = "E:\\targets";//扫描文件路径public static final String FILE_SUFFIX = "html";//扫描文件类型,不设置,默认为所有文件public static final String CHARSET = "UTF-8";//文件处理编码格式public static void main(String[] args) {MongoClient client =new MongoClient(HOST,PORT);MongoDatabase dataBase = client.getDatabase(DATABASE_NAME);MongoCollection<Document> collection = dataBase.getCollection(COLLECTION_NAME);Pool p = new Pool();Produce pro = new Produce(p);Long startTime = System.currentTimeMillis(); new Thread(pro).start();//开启从磁盘读取文件的线程Thread[] th = new Thread[ThreadNum];for(int i=0;i<ThreadNum;i++){//开启向mongoDb写入数据的线程Thread a = new Thread(new Customer(p,collection));a.start();th[i]=a;}boolean res=true;while(res){if(MongodbBatchInsetUtils.ThreadSizeCount==ThreadNum+1){res=false;Long endTime = System.currentTimeMillis();System.out.println("数据写入完成,吸入总数:"+p.hasUploadToDB+",共花费时间约为:"+(endTime-startTime)+"ms\n");for(Thread t:th){t.interrupt();//在子线程将数据写完后,中断子线程。 }if(null!=client){client.close();//关闭连接collection=null;dataBase=null;} } else {System.out.println("已写入数据:"+p.hasUploadToDB);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}} }//生产者,从磁盘读取数据 class Produce implements Runnable{private Pool pool=null;public Produce(Pool pool){this.pool= pool;}@Overridepublic void run() {getFilesInDir(MongodbBatchInsetUtils.DIR, MongodbBatchInsetUtils.FILE_SUFFIX);MongodbBatchInsetUtils.ThreadSizeCount++;System.out.println("READING FINISHED!!");}//递归读取dir目录中所有以suffix结尾的文件,若不指定文件类型,默认读取所有文件public void getFilesInDir(String dir,String suffix){if(null!=dir && dir.trim().length()>0){File file = new File(dir.trim());if(file.exists() && file.isDirectory()){File[] flist = file.listFiles();if(null!=flist && flist.length>0){for(File f:flist){if(f.isFile()){if(null==suffix|| "".equals(suffix)){pool.putFile(f);}if(null!=suffix &&suffix.trim().length()>0){if(f.getName().endsWith(suffix.trim())){pool.putFile(f);}else{throw new RuntimeException("找不到对应文件类型");}}}else{getFilesInDir(f.getAbsolutePath(),suffix);}}}else{throw new RuntimeException("文件内容为空");}}else{throw new RuntimeException("目录不存在,请检查路径正确性!");}}} }//消费者,向mongoDb中写数据 class Customer implements Runnable{private Pool pool=null;MongoCollection<Document> collection = null;public Customer(Pool pool,MongoCollection<Document> collection){this.pool = pool;this.collection = collection;}@Overridepublic void run() {while(true){File f = pool.fetchFile();if(null==f){return ;}try {saveToMonGoDb(f); // if(pool.hasUploadToDB%1000==0) // System.out.println("已写入数据:"+pool.hasUploadToDB);} catch (MongoWriteException e) {System.out.println("写入数据库异常:"+e.getMessage());return ;}if(pool.getSize()==0){System.out.println(Thread.currentThread().getName()+" :WRITTING FINISHED!!");MongodbBatchInsetUtils.ThreadSizeCount++;}}}//将文件以文件名为id,文件内容为值保存在数据库中private void saveToMonGoDb(File file){String _id = file.getName().substring(0,file.getName().lastIndexOf("."));String content = readFileContext(file, MongodbBatchInsetUtils.CHARSET);Document document = new Document("_id",_id).append("content", content);collection.insertOne(document);}//读取文件内容,以charSet编码处理public static String readFileContext(File file,String charSet) {StringBuilder sb;BufferedReader reader=null;try {reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charSet)); String line = null;sb = new StringBuilder();while(null!=(line = reader.readLine())){sb.append(line+"\n");}return sb.toString();}catch (Exception e) {System.out.println("文件读取失败!"+e.getMessage());}finally{try {reader.close();} catch (IOException e) {e.printStackTrace();}}return null;}}//池,缓冲区 class Pool{volatile int size=0;//缓冲区中条目数量volatile int limit =1000;volatile int hasUploadToDB=0;volatile private List<File> files = new LinkedList<File>();//入栈public synchronized void putFile(File file){while(files.size()==limit){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}files.add(file);notifyAll();++size;}//出栈public synchronized File fetchFile(){while(files.size()==0 ){try {this.wait();} catch (InterruptedException e) {return null;}}File file = null;notify();if(files.size()>0){file = files.remove(0);--size;++hasUploadToDB;}return file;}public int getSize(){return this.size;}}