1.後端Servlet
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gorilla.entity.LEStreamChannelHistory;
import com.gorilla.webPortal.WebPortalConstant;
import com.gorilla.webPortal.web.api.view.object.StreamChannel;
import com.gorilla.webPortal.web.defaultPage.service.DefaultPageService;
import com.gorilla.webPortal.web.utils.ServiceLocator;
import com.gorilla.webPortal.web.utils.chat.ChatRecorder;
import com.gorilla.webPortal.web.utils.chat.CheckChatRoomStatus;
import com.gorilla.webPortal.web.utils.chat.MessageBatchSenderNormal;
import com.gorilla.webPortal.web.utils.chat.obj.ChatMessage;
@WebServlet(urlPatterns = { "/chat" }, asyncSupported = true)
public class ChatServlet extends HttpServlet {
protected Logger logger = null;
private static final Integer TIMEOUT = 240 * 60 * 1000; //milliSeconds
protected static final ConcurrentMap<Integer, Queue<AsyncContext>> chatRooms = new ConcurrentHashMap<Integer, Queue<AsyncContext>>();
private static final ConcurrentMap<Integer, Object> roomKeys = new ConcurrentHashMap<Integer, Object>();
private static final ConcurrentMap<Integer, Thread> chatRecorderJobs = new ConcurrentHashMap<Integer, Thread>();
private static final ConcurrentMap<Integer, ChatRecorder> chatRecorders = new ConcurrentHashMap<Integer, ChatRecorder>(); //key:直播頻道id, value:對應的 聊天室紀錄器
private static final ConcurrentMap<Integer, Thread> messageSenderJobs = new ConcurrentHashMap<Integer, Thread>();
private static final ConcurrentMap<Integer, MessageSender> messageSenders = new ConcurrentHashMap<Integer, MessageSender>(); //key:直播頻道id, value:用來發送訊息給該頻道的所有用戶
private static final ConcurrentMap<Integer, MessageBatchSenderNormal> messageBatchSenders = new ConcurrentHashMap<Integer, MessageBatchSenderNormal>(); //key:直播頻道id, value:批次顯示之前討論訊息給新登入的使用者
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
this.logger = LoggerFactory.getLogger(getClass());
DefaultPageService defaultPageService = ServiceLocator.getInstance().getDefaultPageService();
List<StreamChannel> streamChannels = defaultPageService.getStreamChannels();
for (StreamChannel item : streamChannels)
{
roomKeys.put(item.getId(), new Object());
messageBatchSenders.put(item.getId(), new MessageBatchSenderNormal());
}
CheckChatRoomStatus checkChatRoomStatus = new CheckChatRoomStatus(chatRooms, chatRecorderJobs, chatRecorders, messageSenderJobs, messageSenders, messageBatchSenders);
//專門用來檢查各個聊天室狀態的thread,當直播頻道被關閉或進入replay時,聊天室id對應的Queue<AsyncContext>、聊天紀錄器、須被移除
Thread checkChatRoomStatusJob = new Thread(checkChatRoomStatus, "checkChatRoomStatusJob");
checkChatRoomStatusJob.setDaemon(true);
checkChatRoomStatusJob.start();
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws IOException, ServletException {
this.doPost(req, res);
}
@Override
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
DefaultPageService defaultPageService = ServiceLocator.getInstance()
.getDefaultPageService();
Integer streamChannelId = Integer.parseInt(request
.getParameter("floor"));
if (chatRooms.get(streamChannelId) == null) {
synchronized (roomKeys.get(streamChannelId)) {
Queue<AsyncContext> asyncContextQueue = chatRooms
.get(streamChannelId);
if (asyncContextQueue == null) {
StreamChannel streamChannel = defaultPageService
.getStreamChannel(streamChannelId, "ONLINE");
if (streamChannel == null) {
PrintWriter pw = response.getWriter();
pw.write("The Stream Channel " + streamChannelId
+ "F has not started yet");
pw.flush();
return;
}
// new a specific streamChannelId's ConcurrentLinkedQueue
asyncContextQueue = new ConcurrentLinkedQueue<AsyncContext>();
chatRooms.put(streamChannelId, asyncContextQueue);
// new MessageBatchSenderNormal
MessageBatchSenderNormal messageBatchSender = new MessageBatchSenderNormal();
messageBatchSenders
.put(streamChannelId, messageBatchSender);
String filePath = "";
if (WebPortalConstant.CHATROOM_FILE_PATH
.charAt(WebPortalConstant.CHATROOM_FILE_PATH
.length() - 1) == '/') {
filePath = WebPortalConstant.CHATROOM_FILE_PATH;
} else {
filePath = WebPortalConstant.CHATROOM_FILE_PATH + "/";
}
LEStreamChannelHistory leStreamChannelHistory = defaultPageService
.getLatestStreamChannelHistory(streamChannelId,
streamChannel.getPublishDate());
ChatRecorder chatRecorder = new ChatRecorder(
"D:/ContentStorage" + filePath
+ leStreamChannelHistory.getLehid() + ".txt");
chatRecorders.put(streamChannelId, chatRecorder);
Thread chatRecorderJob = new Thread(chatRecorder,
"ChatRecorder[" + streamChannelId + "F]");
chatRecorderJob.setDaemon(true);
chatRecorderJob.start();
chatRecorderJobs.put(streamChannelId, chatRecorderJob);
// start a messageSender job,用來發送訊息給同一頻道中的Queue<AsyncContext>
MessageSender messageSender = new MessageSender(
asyncContextQueue);
messageSenders.put(streamChannelId, messageSender);
Thread messageSenderThread = new Thread(messageSender,
"MessageSender[" + streamChannelId + "F]");
messageSenderThread.setDaemon(true);
messageSenderThread.start();
messageSenderJobs.put(streamChannelId, messageSenderThread);
}
}
}
if ("sendMessage".equalsIgnoreCase(request.getParameter("actionType"))) {
String message = request.getParameter("message");
String name = request.getParameter("name");
ChatMessage chatMessage = new ChatMessage(name, message);
messageBatchSenders.get(streamChannelId).addMessage(chatMessage);
messageSenders.get(streamChannelId).send(name, message);
try {
chatRecorders.get(streamChannelId).addChatMessage(chatMessage);
// chatRecorder1F.addChatMessage(chatMessage);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("add Message to chatRecorder Map occur error, error info",e);
}
} else {
response.setContentType("text/event-stream, charset=UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
response.setHeader("Access-Control-Allow-Origin", "*"); //for IE8,9 important
// String floor = request.getParameter("floor");
String name = request.getParameter("name");
Boolean firstIn = Boolean.parseBoolean(request
.getParameter("firstIn"));
final AsyncContext ac = request.startAsync();
ac.setTimeout(TIMEOUT);
ac.addListener(new AsyncListener() {
public void onComplete(AsyncEvent event) throws IOException {
Integer streamChannelId = Integer.parseInt(ac.getRequest()
.getParameter("floor"));
chatRooms.get(streamChannelId).remove(ac);
System.out.println(streamChannelId + " Asyncontext size:"
+ chatRooms.get(streamChannelId).size());
}
public void onTimeout(AsyncEvent event) throws IOException {
Integer streamChannelId = Integer.parseInt(ac.getRequest()
.getParameter("floor"));
chatRooms.get(streamChannelId).remove(ac);
System.out.println(streamChannelId + " Asyncontext size:"
+ chatRooms.get(streamChannelId).size());
}
public void onError(AsyncEvent event) throws IOException {
Integer streamChannelId = Integer.parseInt(ac.getRequest()
.getParameter("floor"));
chatRooms.get(streamChannelId).remove(ac);
System.out.println(streamChannelId + " Asyncontext size:"
+ chatRooms.get(streamChannelId).size());
}
public void onStartAsync(AsyncEvent event) throws IOException {
// System.out.println("Asyncontext size:"+
// asyncContextQueue.size());
}
});
if (firstIn) {
// messageBatchSender1F.addAsyncContext(ac);
messageBatchSenders.get(streamChannelId).sendAllMessage(ac);
// messageBatchSender1F.sendAllMessage(ac);
}
chatRooms.get(streamChannelId).add(ac);
logger.info(streamChannelId + " Asyncontext size:"
+ chatRooms.get(streamChannelId).size());
}
}
public class MessageSender implements Runnable {
protected boolean running = true;
protected ArrayList<String> messages = new ArrayList<String>(); //=>改成用 currentLinkedQueue or blockQueue
private Queue<AsyncContext> asyncContextQueue;
public MessageSender(Queue<AsyncContext> asyncContextQueue) {
this.asyncContextQueue = asyncContextQueue;
}
public void stop() {
running = false;
}
/**
* Add message for sending.
*/
public void send(String user, String message) {
synchronized (messages) { //這邊就不需要synchronized
JSONObject result = new JSONObject();
try {
result.put("message", message);
result.put("name", user);
result.put("sendTime", new Date().getTime());
} catch (JSONException e) {
// TODO Auto-generated catch block
logger.error("add message occur error", e.getMessage());
}
messages.add(result.toString());
messages.notify();
}
}
public void run() {
while (running) {
PrintWriter out;
if (messages.size() == 0) {
try {
synchronized (messages) {
messages.wait();
}
} catch (InterruptedException e) {
// Ignore
}
}
synchronized (asyncContextQueue) {
String[] pendingMessages = null;
synchronized (messages) { //=>這邊也不需要了synchronized
pendingMessages = messages.toArray(new String[messages.size()]); //直接queue.poll
messages.clear();
}
for (AsyncContext ac : asyncContextQueue) {
try {
HttpServletRequest request = (HttpServletRequest) ac
.getRequest();
String ua = request.getHeader("User-Agent");
boolean isMSIE = (ua != null && (ua.indexOf("MSIE") != -1 || ua
.indexOf("Trident") != -1));
PrintWriter acWriter = ac.getResponse().getWriter();
for (int j = 0; j < pendingMessages.length; j++) {
if (isMSIE)
acWriter.println(URLEncoder.encode(
pendingMessages[j], "UTF-8"));
else
acWriter.println("data: "
+ URLEncoder
.encode(pendingMessages[j],
"UTF-8") + "\n\n");
}
acWriter.flush();
// ac.complete();
} catch (IOException ex) {
logger.error("out put message occur error:"
+ ex.getMessage());
asyncContextQueue.remove(ac);
logger.info("After error remove asyncContextQueue'size:"
+ asyncContextQueue.size());
}
}
}
}
}
}
}
2.前端js
joinChat: function(){
fbName = '';
FB.getLoginStatus(function(response) {
if (response.status === 'connected') {
$("#btnChatSwitch").off("click").val("離開討論");
$("#btnChatSwitch").on("click", WP.LiveStream.leaveChat);
$('#btnSendMessage').prop("disabled", false);
$("#btnSendMessage").off("click");
$("#btnSendMessage").on("click", WP.LiveStream.sendMessage);
$("#inputBox").off("keyup");
$("#inputBox").on("keyup", function(event){
if (event.keyCode == 13 && event.shiftKey) {
//alert('test');
WP.LiveStream.sendMessage();
}
});
$("#btnLogin").prop("disabled", true);
FB.api('/me', function(response) {
if (window.console){
console.log('Successful login for: ' + response.name + "; userID ="+ response.id );
}
WP.LiveStream.fbName = response.name;
//the receiveServerSend procedure has execute at click button #btnChatSwitch first
if (typeof(EventSource) !== "undefined"){
/*if (typeof(eventSource) !== "undefined")
eventSource.close();
else
WP.LiveStream.receiveServerSend(response.name);*/
//never build the comet connection
if (WP.LiveStream.eventSource == undefined){
WP.LiveStream.receiveServerSend(response.name, WP.LiveStream.firstIn);
WP.LiveStream.firstIn =false;
}
else{
//has builed, do nothing.
}
}
else{
/*if (typeof(http) !== 'undefined' )
http.abort();
else
WP.LiveStream.pollingChat(response.name);*/
if (WP.LiveStream.http == undefined){
WP.LiveStream.pollingChat(response.name , WP.LiveStream.firstIn);
WP.LiveStream.firstIn = false;
}
}
});
}else if (response.status ==='unknown' || response.status ==='not_authorized'){
$("#btnChatSwitch").off("click").val("離開討論");
$("#btnChatSwitch").on("click", WP.LiveStream.leaveChat);
$('#btnSendMessage').prop("disabled", true);
$("#btnSendMessage").off("click");
$("#btnLogin").prop("disabled", false);
FB.Event.subscribe('auth.login', WP.LiveStream.joinChat); //when after login fb, call the callback "joinChat" method
//WP.LiveStream.receiveServerSend("", WP.LiveStream.firstIn);
//WP.LiveStream.firstIn = false;
//the receiveServerSend procedure has execute at click button #btnChatSwitch first
if (typeof(EventSource) !== "undefined"){
/*if (typeof(eventSource) !== "undefined")
eventSource.close();
else
WP.LiveStream.receiveServerSend(response.name);*/
//never build the comet connection
if (WP.LiveStream.eventSource == undefined){
WP.LiveStream.receiveServerSend("", WP.LiveStream.firstIn);
WP.LiveStream.firstIn =false;
}
else{
//has builed, do nothing.
}
}
else{
/*if (typeof(http) !== 'undefined' )
http.abort();
else
WP.LiveStream.pollingChat(response.name);*/
if (WP.LiveStream.http == undefined){
WP.LiveStream.pollingChat("" , WP.LiveStream.firstIn);
WP.LiveStream.firstIn = false;
}
}
}
WP.LiveStream.switchUISize("chatRoom");
});
},
leaveChat: function(){
if (typeof(WP.LiveStream.http) !== 'undefined' ){
WP.LiveStream.http.abort();
WP.LiveStream.http = undefined;
}else if (typeof(WP.LiveStream.eventSource) !== 'undefined'){
WP.LiveStream.eventSource.close();
WP.LiveStream.eventSource = undefined;
}
WP.LiveStream.firstIn = true;
$(".messages > li").remove();
WP.LiveStream.switchUISize("normal");
$("#btnChatSwitch").off("click").val("加入討論");
$("#btnChatSwitch").on("click", WP.LiveStream.joinChat);
},
receiveServerSend: function(name, firstIn){
var name = name || "";
var url = "/"+ WP.webContext +"/chat/";
var params = "name="+name+"&floor="+ WP.LiveStream.floor+"&firstIn="+ firstIn;
if (typeof(EventSource) !== "undefined") {
WP.LiveStream.eventSource = new EventSource(url+"?"+params);
WP.LiveStream.eventSource.onmessage = function(event) {
var decodeResponseText = decodeURIComponent(event.data);
decodeResponseText = decodeResponseText.replace(/\+/g, " ");
var result = $.parseJSON(decodeResponseText);
renderChatMessage(result);
};
} else {
//alert("Sorry, Server-Sent Event is not supported in your browser");
}
},
pollingChat: function(name, firstIn){
var url = "/"+ WP.webContext +"/chat/";
var params = "name="+name+"&floor="+ WP.LiveStream.floor + "&firstIn="+ firstIn;
var currentIndex = 0;
//alert(params);
setTimeout(function() {
if (IE.actualVersion == "9"){ //for IE8,9
WP.LiveStream.http = new XDomainRequest();
//WP.LiveStream.http.timeout = WP.LiveStream.timeout;
WP.LiveStream.http.open("get", url+"?"+params);
WP.LiveStream.http.send();
WP.LiveStream.http.onload = function(){
//alert("come onload");
WP.LiveStream.pollingChat(name, false);
};
WP.LiveStream.http.onerror = function() {
//alert("comet occur error");
};
WP.LiveStream.http.onprogress = function(){
//alert("come progress");
var kb = 2 * 1024;
//var kb = 0;
var currentResponseText = WP.LiveStream.http.responseText.substr(kb+currentIndex, WP.LiveStream.http.responseText.length);
currentIndex = currentResponseText.length + currentIndex;
var decodeResponseText = decodeURIComponent(currentResponseText);
//currentIndex = decodeResponseText.length + currentIndex;
decodeResponseText = decodeResponseText.replace(/\+/g, " ");
var result = $.parseJSON(decodeResponseText);
//result = result.replaceAll("+"," ");
renderChatMessage(result);
};
}else{
WP.LiveStream.http = new XMLHttpRequest();
WP.LiveStream.http.open("GET", url+"?"+params, true);
WP.LiveStream.http.timeout = WP.LiveStream.timeout;
WP.LiveStream.http.setRequestHeader("Content-type", "application/x-www-form-urlencoded; charset=UTF-8");
WP.LiveStream.http.onreadystatechange = function() {//Call a function when the state changes.
if(WP.LiveStream.http.readyState == 1 ) {
//$(".messages").after("<li class='message'>"+http.responseText+"</li>");
}
if(WP.LiveStream.http.readyState == 3 && WP.LiveStream.http.status == 200) {
var currentResponseText = WP.LiveStream.http.responseText.substr(currentIndex, WP.LiveStream.http.responseText.length);
currentIndex = currentResponseText.length + currentIndex;
var decodeResponseText = decodeURIComponent(currentResponseText);
//currentIndex = decodeResponseText.length + currentIndex;
decodeResponseText = decodeResponseText.replace(/\+/g, " ");
var result = $.parseJSON(decodeResponseText);
//result = result.replaceAll("+"," ");
renderChatMessage(result);
}else if (WP.LiveStream.http.readyState == 4 && WP.LiveStream.http.status == 200 ){
//alert(http.responseText);
WP.LiveStream.pollingChat(name, false);
}
};
WP.LiveStream.http.send();
}
},1000);
},
sendMessage: function(){
$("#btnSendMessage").prop("disabled",true);
var message = $("#inputBox").val();
$("#inputBox").val('');
$.ajax({
type: "POST",
url: "/"+ WP.webContext +"/chat/",
data: {
'floor' : WP.LiveStream.floor,
'message' : message,
'name' : WP.LiveStream.fbName,
'actionType': 'sendMessage'
//'categoryId' : WP.API.VODChannel.categoryId,
},
dataType: "html",
}).done(function(result){
$("#btnSendMessage").prop("disabled",false);
});
},