2018年9月12日 星期三

非同步Servlet3.0實作聊天室-整合FB登入 記錄

1.後端Servlet
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.gorilla.entity.LEStreamChannelHistory;
import com.gorilla.webPortal.WebPortalConstant;
import com.gorilla.webPortal.web.api.view.object.StreamChannel;
import com.gorilla.webPortal.web.defaultPage.service.DefaultPageService;
import com.gorilla.webPortal.web.utils.ServiceLocator;
import com.gorilla.webPortal.web.utils.chat.ChatRecorder;
import com.gorilla.webPortal.web.utils.chat.CheckChatRoomStatus;
import com.gorilla.webPortal.web.utils.chat.MessageBatchSenderNormal;
import com.gorilla.webPortal.web.utils.chat.obj.ChatMessage;

@WebServlet(urlPatterns = { "/chat" }, asyncSupported = true)
public class ChatServlet extends HttpServlet {

 protected Logger logger = null;
 private static final Integer TIMEOUT = 240 * 60 * 1000; //milliSeconds

 protected static final ConcurrentMap<Integer, Queue<AsyncContext>> chatRooms = new ConcurrentHashMap<Integer, Queue<AsyncContext>>();
 private static final ConcurrentMap<Integer, Object> roomKeys = new ConcurrentHashMap<Integer, Object>();
 private static final ConcurrentMap<Integer, Thread> chatRecorderJobs = new ConcurrentHashMap<Integer, Thread>();
 private static final ConcurrentMap<Integer, ChatRecorder> chatRecorders = new ConcurrentHashMap<Integer, ChatRecorder>(); //key:直播頻道id, value:對應的 聊天室紀錄器
 private static final ConcurrentMap<Integer, Thread> messageSenderJobs = new ConcurrentHashMap<Integer, Thread>();
 private static final ConcurrentMap<Integer, MessageSender> messageSenders = new ConcurrentHashMap<Integer, MessageSender>(); //key:直播頻道id, value:用來發送訊息給該頻道的所有用戶 
 private static final ConcurrentMap<Integer, MessageBatchSenderNormal> messageBatchSenders = new ConcurrentHashMap<Integer, MessageBatchSenderNormal>(); //key:直播頻道id, value:批次顯示之前討論訊息給新登入的使用者
 
 @Override
 public void init(ServletConfig config) throws ServletException {
     super.init(config);
     this.logger = LoggerFactory.getLogger(getClass());
     
     DefaultPageService defaultPageService = ServiceLocator.getInstance().getDefaultPageService();
     

     List<StreamChannel> streamChannels = defaultPageService.getStreamChannels();
     for (StreamChannel item : streamChannels)
     {
       roomKeys.put(item.getId(), new Object());
       messageBatchSenders.put(item.getId(), new MessageBatchSenderNormal());
     }
     CheckChatRoomStatus checkChatRoomStatus = new CheckChatRoomStatus(chatRooms, chatRecorderJobs, chatRecorders, messageSenderJobs, messageSenders, messageBatchSenders);
     
     //專門用來檢查各個聊天室狀態的thread,當直播頻道被關閉或進入replay時,聊天室id對應的Queue<AsyncContext>、聊天紀錄器、須被移除
     Thread checkChatRoomStatusJob = new Thread(checkChatRoomStatus, "checkChatRoomStatusJob");
     
     checkChatRoomStatusJob.setDaemon(true);
     checkChatRoomStatusJob.start();
   }
 @Override
 protected void doGet(HttpServletRequest req, HttpServletResponse res)
   throws IOException, ServletException {
  this.doPost(req, res);
 }

 @Override
 protected void doPost(HttpServletRequest request,
   HttpServletResponse response) throws IOException, ServletException {

  DefaultPageService defaultPageService = ServiceLocator.getInstance()
    .getDefaultPageService();
  Integer streamChannelId = Integer.parseInt(request
    .getParameter("floor"));

  if (chatRooms.get(streamChannelId) == null) {
   synchronized (roomKeys.get(streamChannelId)) {
    Queue<AsyncContext> asyncContextQueue = chatRooms
      .get(streamChannelId);
    if (asyncContextQueue == null) {
     StreamChannel streamChannel = defaultPageService
       .getStreamChannel(streamChannelId, "ONLINE");
     if (streamChannel == null) {
      PrintWriter pw = response.getWriter();
      pw.write("The Stream Channel " + streamChannelId
        + "F has not started yet");
      pw.flush();
      return;
     }

     // new a specific streamChannelId's ConcurrentLinkedQueue
     asyncContextQueue = new ConcurrentLinkedQueue<AsyncContext>();
     chatRooms.put(streamChannelId, asyncContextQueue);

     // new MessageBatchSenderNormal
     MessageBatchSenderNormal messageBatchSender = new MessageBatchSenderNormal();
     messageBatchSenders
       .put(streamChannelId, messageBatchSender);

     String filePath = "";
     if (WebPortalConstant.CHATROOM_FILE_PATH
       .charAt(WebPortalConstant.CHATROOM_FILE_PATH
         .length() - 1) == '/') {
      filePath = WebPortalConstant.CHATROOM_FILE_PATH;
     } else {
      filePath = WebPortalConstant.CHATROOM_FILE_PATH + "/";
     }

     LEStreamChannelHistory leStreamChannelHistory = defaultPageService
       .getLatestStreamChannelHistory(streamChannelId,
         streamChannel.getPublishDate());

     ChatRecorder chatRecorder = new ChatRecorder(
       "D:/ContentStorage" + filePath
         + leStreamChannelHistory.getLehid() + ".txt");

     chatRecorders.put(streamChannelId, chatRecorder);
     Thread chatRecorderJob = new Thread(chatRecorder,
       "ChatRecorder[" + streamChannelId + "F]");
     chatRecorderJob.setDaemon(true);
     chatRecorderJob.start();
     chatRecorderJobs.put(streamChannelId, chatRecorderJob);

     // start a messageSender job,用來發送訊息給同一頻道中的Queue<AsyncContext>
     MessageSender messageSender = new MessageSender(
       asyncContextQueue);
     messageSenders.put(streamChannelId, messageSender);
     Thread messageSenderThread = new Thread(messageSender,
       "MessageSender[" + streamChannelId + "F]");
     messageSenderThread.setDaemon(true);
     messageSenderThread.start();
     messageSenderJobs.put(streamChannelId, messageSenderThread);

    }
   }
  }

  if ("sendMessage".equalsIgnoreCase(request.getParameter("actionType"))) {
   String message = request.getParameter("message");
   String name = request.getParameter("name");
   ChatMessage chatMessage = new ChatMessage(name, message);
   messageBatchSenders.get(streamChannelId).addMessage(chatMessage);
   messageSenders.get(streamChannelId).send(name, message);
   
   try {
    chatRecorders.get(streamChannelId).addChatMessage(chatMessage);
    // chatRecorder1F.addChatMessage(chatMessage);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    logger.error("add Message to chatRecorder Map occur error, error info",e);
   }
   
  } else {

   response.setContentType("text/event-stream, charset=UTF-8");
   response.setHeader("Cache-Control", "no-cache");
   response.setHeader("Connection", "keep-alive");
   response.setHeader("Access-Control-Allow-Origin", "*"); //for IE8,9 important
   // String floor = request.getParameter("floor");
   String name = request.getParameter("name");
   Boolean firstIn = Boolean.parseBoolean(request
     .getParameter("firstIn"));

   final AsyncContext ac = request.startAsync();
   ac.setTimeout(TIMEOUT);
   ac.addListener(new AsyncListener() {
    public void onComplete(AsyncEvent event) throws IOException {
     Integer streamChannelId = Integer.parseInt(ac.getRequest()
       .getParameter("floor"));
     
     chatRooms.get(streamChannelId).remove(ac);
     System.out.println(streamChannelId + " Asyncontext size:"
       + chatRooms.get(streamChannelId).size());
    }

    public void onTimeout(AsyncEvent event) throws IOException {
     Integer streamChannelId = Integer.parseInt(ac.getRequest()
       .getParameter("floor"));

     chatRooms.get(streamChannelId).remove(ac);
     System.out.println(streamChannelId + " Asyncontext size:"
       + chatRooms.get(streamChannelId).size());
    }

    public void onError(AsyncEvent event) throws IOException {
     Integer streamChannelId = Integer.parseInt(ac.getRequest()
       .getParameter("floor"));

     chatRooms.get(streamChannelId).remove(ac);
     System.out.println(streamChannelId + " Asyncontext size:"
       + chatRooms.get(streamChannelId).size());
    }

    public void onStartAsync(AsyncEvent event) throws IOException {
     // System.out.println("Asyncontext size:"+
     // asyncContextQueue.size());
    }
   });

   if (firstIn) {
    // messageBatchSender1F.addAsyncContext(ac);
    messageBatchSenders.get(streamChannelId).sendAllMessage(ac);
    // messageBatchSender1F.sendAllMessage(ac);
   }
   chatRooms.get(streamChannelId).add(ac);
   logger.info(streamChannelId + " Asyncontext size:"
     + chatRooms.get(streamChannelId).size());

  }
 }

 public class MessageSender implements Runnable {
  protected boolean running = true;
  protected ArrayList<String> messages = new ArrayList<String>(); //=>改成用 currentLinkedQueue or blockQueue
  private Queue<AsyncContext> asyncContextQueue;

  public MessageSender(Queue<AsyncContext> asyncContextQueue) {
   this.asyncContextQueue = asyncContextQueue;
  }

  public void stop() {
   running = false;
  }

  /**
   * Add message for sending.
   */
  public void send(String user, String message) {

   synchronized (messages) { //這邊就不需要synchronized
    JSONObject result = new JSONObject();

    try {
     result.put("message", message);
     result.put("name", user);
     result.put("sendTime", new Date().getTime());
    } catch (JSONException e) {
     // TODO Auto-generated catch block
     logger.error("add message occur error", e.getMessage());
    }
    messages.add(result.toString());
    messages.notify();
   }
  }

  public void run() {
   while (running) {
    PrintWriter out;

    if (messages.size() == 0) {
     try {
      synchronized (messages) {
       messages.wait();
      }
     } catch (InterruptedException e) {
      // Ignore
     }
    }
    synchronized (asyncContextQueue) {
     String[] pendingMessages = null;
     synchronized (messages) { //=>這邊也不需要了synchronized
      pendingMessages = messages.toArray(new String[messages.size()]); //直接queue.poll
      messages.clear();
     }
     for (AsyncContext ac : asyncContextQueue) {
      try {
       HttpServletRequest request = (HttpServletRequest) ac
         .getRequest();
       String ua = request.getHeader("User-Agent");
       boolean isMSIE = (ua != null && (ua.indexOf("MSIE") != -1 || ua
         .indexOf("Trident") != -1));
       PrintWriter acWriter = ac.getResponse().getWriter();
       for (int j = 0; j < pendingMessages.length; j++) {
        if (isMSIE)
         acWriter.println(URLEncoder.encode(
           pendingMessages[j], "UTF-8"));
        else
         acWriter.println("data: "
           + URLEncoder
             .encode(pendingMessages[j],
               "UTF-8") + "\n\n");
       }
       acWriter.flush();
       
       // ac.complete();
      } catch (IOException ex) {
       logger.error("out put message occur error:"
         + ex.getMessage());
       asyncContextQueue.remove(ac);
       logger.info("After error remove asyncContextQueue'size:"
         + asyncContextQueue.size());
      }
     }
    }

   }

  }

 }
}


2.前端js
joinChat: function(){
  fbName = '';
  FB.getLoginStatus(function(response) {
   if (response.status === 'connected') {
    $("#btnChatSwitch").off("click").val("離開討論");
          $("#btnChatSwitch").on("click", WP.LiveStream.leaveChat);
          $('#btnSendMessage').prop("disabled", false);
          
          $("#btnSendMessage").off("click");
          $("#btnSendMessage").on("click", WP.LiveStream.sendMessage);
          $("#inputBox").off("keyup");
          $("#inputBox").on("keyup", function(event){
            if (event.keyCode == 13 && event.shiftKey) {
             //alert('test');
             WP.LiveStream.sendMessage();
            }
          });
          
    $("#btnLogin").prop("disabled", true);
    
    
     FB.api('/me', function(response) {
            if (window.console){
             console.log('Successful login for: ' + response.name + "; userID ="+ response.id );
            }
             
            WP.LiveStream.fbName = response.name;
            
            //the receiveServerSend procedure has execute at click button #btnChatSwitch first
            if (typeof(EventSource) !== "undefined"){
             /*if (typeof(eventSource) !== "undefined")
              eventSource.close();
             else
              WP.LiveStream.receiveServerSend(response.name);*/
             //never build the comet connection
             if (WP.LiveStream.eventSource == undefined){
              WP.LiveStream.receiveServerSend(response.name, WP.LiveStream.firstIn);
                 WP.LiveStream.firstIn =false;
              
             }
             else{
              //has builed, do nothing.
             }
            }
            else{
             /*if (typeof(http) !== 'undefined' )
              http.abort();
             else
              WP.LiveStream.pollingChat(response.name);*/
             if (WP.LiveStream.http == undefined){
              WP.LiveStream.pollingChat(response.name , WP.LiveStream.firstIn);
               WP.LiveStream.firstIn = false;
             }
              
            }
          });
     
    
   }else if (response.status ==='unknown' || response.status ==='not_authorized'){
    $("#btnChatSwitch").off("click").val("離開討論");
    $("#btnChatSwitch").on("click", WP.LiveStream.leaveChat);
    $('#btnSendMessage').prop("disabled", true);
    
    $("#btnSendMessage").off("click");
    
    $("#btnLogin").prop("disabled", false);
    FB.Event.subscribe('auth.login', WP.LiveStream.joinChat); //when after login fb, call the callback "joinChat" method  
    //WP.LiveStream.receiveServerSend("", WP.LiveStream.firstIn);
    //WP.LiveStream.firstIn = false;
    //the receiveServerSend procedure has execute at click button #btnChatSwitch first
          if (typeof(EventSource) !== "undefined"){
           /*if (typeof(eventSource) !== "undefined")
            eventSource.close();
           else
            WP.LiveStream.receiveServerSend(response.name);*/
           //never build the comet connection
           if (WP.LiveStream.eventSource == undefined){
            WP.LiveStream.receiveServerSend("", WP.LiveStream.firstIn);
               WP.LiveStream.firstIn =false;
            
           }
           else{
            //has builed, do nothing.
           }
          }
          else{
           /*if (typeof(http) !== 'undefined' )
            http.abort();
           else
            WP.LiveStream.pollingChat(response.name);*/
           if (WP.LiveStream.http == undefined){
            WP.LiveStream.pollingChat("" , WP.LiveStream.firstIn);
             WP.LiveStream.firstIn = false;
           }
            
          }
   }
   WP.LiveStream.switchUISize("chatRoom");
  });
  
  
 },
 leaveChat: function(){
  if (typeof(WP.LiveStream.http) !== 'undefined' ){
   WP.LiveStream.http.abort();
   WP.LiveStream.http = undefined;
  }else if (typeof(WP.LiveStream.eventSource) !== 'undefined'){
   WP.LiveStream.eventSource.close();
   WP.LiveStream.eventSource = undefined;
  }
  WP.LiveStream.firstIn = true;
  $(".messages > li").remove();
  WP.LiveStream.switchUISize("normal");
  $("#btnChatSwitch").off("click").val("加入討論");
        $("#btnChatSwitch").on("click", WP.LiveStream.joinChat);
 },
 receiveServerSend: function(name, firstIn){
  var name = name || "";
  var url = "/"+ WP.webContext +"/chat/"; 
  var params = "name="+name+"&floor="+ WP.LiveStream.floor+"&firstIn="+ firstIn;
  if (typeof(EventSource) !== "undefined") {
        WP.LiveStream.eventSource = new EventSource(url+"?"+params);
        WP.LiveStream.eventSource.onmessage = function(event) {
         var decodeResponseText = decodeURIComponent(event.data);
         decodeResponseText = decodeResponseText.replace(/\+/g, " ");
        var result = $.parseJSON(decodeResponseText);
        renderChatMessage(result);
       
        };
     } else {
      //alert("Sorry, Server-Sent Event is not supported in your browser");
     }
  
 },
 pollingChat: function(name, firstIn){
   
   var url = "/"+ WP.webContext +"/chat/";
   var params = "name="+name+"&floor="+ WP.LiveStream.floor + "&firstIn="+ firstIn;
   var currentIndex = 0;
   //alert(params);
  setTimeout(function() {
      
   if (IE.actualVersion == "9"){ //for IE8,9
    WP.LiveStream.http = new XDomainRequest();
    //WP.LiveStream.http.timeout = WP.LiveStream.timeout;
    WP.LiveStream.http.open("get", url+"?"+params);
    WP.LiveStream.http.send();
    
    WP.LiveStream.http.onload = function(){
     //alert("come onload");
     WP.LiveStream.pollingChat(name, false);
    };
    WP.LiveStream.http.onerror = function() { 
     //alert("comet occur error"); 
    
    };
    WP.LiveStream.http.onprogress = function(){
     //alert("come progress");
      var kb = 2 * 1024;
     //var kb = 0;
         var currentResponseText = WP.LiveStream.http.responseText.substr(kb+currentIndex, WP.LiveStream.http.responseText.length);
         currentIndex = currentResponseText.length + currentIndex;
         var decodeResponseText = decodeURIComponent(currentResponseText);
         //currentIndex = decodeResponseText.length + currentIndex;
         decodeResponseText = decodeResponseText.replace(/\+/g, " ");
         var result = $.parseJSON(decodeResponseText);
         //result = result.replaceAll("+"," ");
         
         renderChatMessage(result);
        
    };    
   }else{
    WP.LiveStream.http = new XMLHttpRequest();
    WP.LiveStream.http.open("GET", url+"?"+params, true);
    WP.LiveStream.http.timeout = WP.LiveStream.timeout;
    WP.LiveStream.http.setRequestHeader("Content-type", "application/x-www-form-urlencoded; charset=UTF-8");
    
        WP.LiveStream.http.onreadystatechange = function() {//Call a function when the state changes.
        if(WP.LiveStream.http.readyState == 1 ) {
        //$(".messages").after("<li class='message'>"+http.responseText+"</li>");
        } 
        if(WP.LiveStream.http.readyState == 3 && WP.LiveStream.http.status == 200) {
            
         var currentResponseText = WP.LiveStream.http.responseText.substr(currentIndex, WP.LiveStream.http.responseText.length);
         currentIndex = currentResponseText.length + currentIndex;
         var decodeResponseText = decodeURIComponent(currentResponseText);
         //currentIndex = decodeResponseText.length + currentIndex;
         decodeResponseText = decodeResponseText.replace(/\+/g, " ");
         var result = $.parseJSON(decodeResponseText);
         //result = result.replaceAll("+"," ");
         
         renderChatMessage(result);
         
           }else if (WP.LiveStream.http.readyState == 4 && WP.LiveStream.http.status == 200 ){
           //alert(http.responseText);
           WP.LiveStream.pollingChat(name, false);
           }
          };
        WP.LiveStream.http.send();
   }
   
  },1000);
 },
 sendMessage: function(){
  $("#btnSendMessage").prop("disabled",true);
  var message = $("#inputBox").val();
   $("#inputBox").val('');
  $.ajax({
   type: "POST",
   url: "/"+ WP.webContext +"/chat/",
   data: {
    'floor'    : WP.LiveStream.floor,
    'message'  : message,
    'name'     : WP.LiveStream.fbName,
    'actionType': 'sendMessage'
    //'categoryId'  : WP.API.VODChannel.categoryId,
   },
   dataType: "html",
  }).done(function(result){
   $("#btnSendMessage").prop("disabled",false);
  });
 },

沒有留言:

張貼留言