728x90
반응형
* 스레드풀(CompletionService)을 이용하여 작업실행하기
- 큰 파일을 읽어 총 라인수를 리턴하는 프로그램을 병렬로 처리함.
- ExecutorService.submit() 메소드는 Future 타입을 리턴하는데 Future 타입 인스턴스로 스레드 작업결과를 확인 할 수 있다.
- Future.get() 메소드를 이용해 결과를 획득하는데. 순차적으로 실행하는것처럼 동작 때문에 병렬로 처리하는 이점이 사라진다.
-이때 CompletionService는 작업실행결과를 BlockingQueue 에 put 하기때문에
계속적으로 작업을 확인하고 처리할 수 있다.
- CompletionService.take() 메소드로 Queue 에서 Future 인스턴스를 얻어온다.
public class Manager {private final static boolean isTestMode = false;private final String dirPathT = "C:/Documents and Settings/minis24/workspace/Test1/WebContent";private final String dirPath = "/userhome/xdxdff/minis24/";private final String fileName = "project.properties";private final String logFileName = "logFile.txt";private static final List<Project> pList = new CopyOnWriteArrayList<Project>();// private static final List<Project> pList = Collections.synchronizedList(new ArrayList<Project>());// private final static ExecutorService exec= Executors.newFixedThreadPool(10);private final static ExecutorService threadPool = Executors.newFixedThreadPool(10);private final static CompletionService<Map<String, String>> exec =new ExecutorCompletionService(threadPool);private final static List<Future<Map<String, String>>> futureList =Collections.synchronizedList(new ArrayList<Future<Map<String, String>>>());/*** @return the plist*/public static List<Project> getPlist() {return pList;}public Manager() {byte[] binary = null;if(isTestMode){binary = readByteFromFile(dirPathT, fileName);}else{System.out.println("dirPath: " + dirPath +", fileName : " + fileName);binary =readByteFromFile(dirPath, fileName);}addProject(binary);}/*** 프로퍼티 파일을 로드한후 파싱하여 그 결과를 Project의 인스턴스를 담은 컬렉션에 저장한다.* 처리 스펙* 1) #으로 시작하면 주석이므로 처리하지 않음* 2) 라인이 빈문자열인경우 처리하지 않음* 3) Project 인스턴스의 코드목록에 해당하는 값이 null 인경우 프로젝트 목록에 담지 않음* 4) Project 인스턴스의 fileList의 size 가 0 인경우 프로젝트 목록에 담지 않음.* @param binary*/private void addProject(byte[] binary) {Collections.synchronizedList(pList);ByteArrayInputStream bis = new ByteArrayInputStream(binary);InputStreamReader isr = new InputStreamReader(bis);BufferedReader br = new BufferedReader(isr);String readStr = null;try {int i = 0;while((readStr = br.readLine())!= null){System.out.println("["+i+"] : S T A R T");System.out.println("******************************");if(StringUtil.trim(readStr).length()==0){// System.out.println("readStr.length()==0");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}if(readStr.startsWith("#")){// System.out.println("readStr.startsWith(#)");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}Project p = new Project(readStr);// 프로젝트 인스턴스 검증if(null == p.getPrjCode()){// System.out.println("null == p.getPrjCode()");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}if(null == p.getAppCode()){// System.out.println("null == p.getAppCode()");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}if(null == p.getShName()){// System.out.println("null == p.getShName()");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}if(null == p.getFileList()){// System.out.println("null == p.getFileList()");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}if(0 == p.getFileList().size()){// System.out.println("0 == p.getFileList().size()");// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");i++;continue;}pList.add(p);// System.out.println("******************************");// System.out.println("["+i+"] 번째 라인 처리 완료");// System.out.println("");i++;}} catch (IOException e) {e.printStackTrace();}finally{try {br.close();isr.close() ;bis.close() ;} catch (IOException e) {System.out.println(e.getCause());}}}/*** 수신파일에 대한 정보를 설정한다.*/public void checkReceiveFile() {//파일 수신여부,수신일자,파일 사이즈 설정firstLoop:for(Project p : this.getPlist()){Map<String,String> fileReceiveYnMap = new HashMap<String, String>();Map<String,String> fileReceiveDateMap = new HashMap<String, String>();Map<String,String> fileSizeMap = new HashMap<String, String>();Map<String, String> fileRowCntMap = new HashMap<String, String>();List<String> fList = p.getFileList();secondLoop:for(String filePath : fList){if("EDW".equals(filePath)){continue secondLoop;}File f = new File(filePath);// ****************************************************// 테스트를 위한 설정if(isTestMode){f= new File("C:/Documents and Settings/minis24/workspace/Test1/WebContent/WEB-INF/test.txt");}// ****************************************************if(f.isFile()){Calendar cal = Calendar.getInstance();cal.setTimeInMillis(f.lastModified());SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmm");String dateStr = df.format(cal.getTime());fileReceiveYnMap.put(filePath, "Y" );fileReceiveDateMap.put(filePath,dateStr );fileSizeMap.put(filePath, f.length()+"");}else{// System.out.println("*** f.isFile is false *** ");if(f.exists()){System.out.println("*** f.isDirectory() : " +f.isDirectory() + "***" );}else{System.out.println("*** ["+filePath +"] is not exist.. !! ***");}}p.setFileReceiveYn(fileReceiveYnMap);p.setFileReceiveDate(fileReceiveDateMap);p.setFileSize(fileSizeMap);p.setFileRowCnt(fileRowCntMap);} //end secondLoop}//end firstLoop}/*** 설정된 프로젝트 정보에서 수신파일정보를 추출하여 파일에 기록한다.* @param plist2*/public void writeReceiveFileInfo() {StringBuffer sb = new StringBuffer("");firstLoop:for(Project p : this.getPlist()){secondLoop:for(String filePath : p.getFileList()){Map<String, String> fileReceiveYnMap = p.getFileReceiveYn();if(fileReceiveYnMap == null){continue ;}if(!"Y".equals(fileReceiveYnMap.get(filePath))){continue ;}Map<String, String> fileSizeMap = p.getFileSize();Map<String, String> fileReceiveDateMap = p.getFileReceiveDate();Map<String, String> fileRowCntMap = p.getFileRowCnt();sb.append(p.getPrjCode());sb.append("\t");sb.append(fileSizeMap.get(filePath));sb.append("\t");sb.append(fileReceiveDateMap.get(filePath));sb.append("\t");sb.append(fileRowCntMap.get(filePath));sb.append("\t");sb.append(filePath);sb.append("\n");}}//end firstLoopif(isTestMode){System.out.println(sb.toString());}File logFile = null;if(isTestMode){logFile = new File(dirPathT,logFileName);}else{logFile = new File(dirPath,logFileName);}FileOutputStream fos = null;try {fos = new FileOutputStream(logFile);fos.write(sb.toString().getBytes());} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally{try {fos.close();} catch (IOException e) {System.out.println(e.getCause());}}}public void checkReceiveFileRowCnt() {try {executeSubmit();System.out.println("submit Complete!! ");addFuture();putRowCntInfo();} catch (InterruptedException e1) {e1.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}finally{threadPool.shutdown();}}private void executeSubmit() {firstLoop:for(Project p : this.getPlist()){secondLoop:for(String filePath : p.getFileList()){// ****************************************************// 테스트를 위한 설정if(isTestMode){filePath = "C:/Documents and Settings/minis24/workspace/Test1/WebContent/WEB-INF/test1.txt";}//프로젝트프로퍼티 파일에 무조건 파일로 설정하였기 때문에 존재여부만 체크하면 충분File f = new File(filePath);if(f.exists() && f.isFile()){//Callable<Map<String, String>> task = new RowCntCallableImpl(filePath);exec.submit(task);}}//end second}//end first}private void addFuture() throws InterruptedException {for (Project p :this.getPlist()){for(String filePath : p.getFileList()){File f = new File(filePath);if(!f.exists()){continue;}futureList.add(exec.take());} //second} // first}private void putRowCntInfo() throws InterruptedException, ExecutionException {//take 해서 얻어온 rowCntMap의 filePath 정보를 이용해서//set 해줄 Project 정보를 얻은 후 set해준다.for(Future<Map<String, String>> future : futureList){Map<String, String> takeMap = future.get();for(Project p : this.getPlist()){Map<String, String> rowCntMap = p.getFileRowCnt();for(String filePath : p.getFileList()){if(takeMap.get(filePath) != null){rowCntMap.put(filePath, takeMap.get(filePath));}}//third}//second}//end for first}private byte[] readByteFromFile(String filePath) {File f = new File(filePath);return readByteFromFile(f);}public byte[] readByteFromFile(String dirPath, String fileName) {File d = new File(dirPath);File f = null;if(d.isDirectory()){f = new File(d ,fileName);}return readByteFromFile(f);}private byte[] readByteFromFile(File f) {byte[] resultBynary = null;ByteArrayOutputStream baos = null;FileInputStream fis =null;try {baos = new ByteArrayOutputStream();fis = new FileInputStream(f);int readCnt=0;byte[] buf = new byte[1024];while((readCnt = fis.read(buf)) != -1){baos.write(buf, 0, readCnt);}baos.flush();resultBynary = baos.toByteArray();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}finally{try {baos.close();fis.close();} catch (IOException e) {e.printStackTrace();}}return resultBynary ;}}
728x90
반응형
'JAVA > JAVA 스레드' 카테고리의 다른 글
[java] 쓰레드의 우선순위 (0) | 2017.11.01 |
---|---|
[java] 싱글쓰레드와 멀티쓰레드 (0) | 2017.11.01 |
[java] start()와 run() (0) | 2017.11.01 |
[java] 쓰레드 구현 및 실행 (0) | 2017.11.01 |
[java] 프로세스와 쓰레드 (0) | 2017.11.01 |
Executor,ExecutorService,ThreadPoolExecutor (0) | 2016.03.09 |
자바 쓰레드 동기화 (syncronized 키워드 사용하기) (0) | 2016.03.09 |
JAVA Thread 사용하기 (0) | 2016.03.09 |