|
Hello,
i'm writing an application in which screens a little section shows some infos to the user that are pushed by the server (kind of events the user is interested in, like the status of some previous long running actions he undertook). The user might have multiple browsers open and I want to notify all those (thus basing my push upon the user login for exemple). I found lot of docs and always the same sample with broadcasts, but what about this kind of use case ? We plan to start with 4000 users and are working with Weblogic 10 servers. I know this post is very general, but I tried to find my way in all samples, docs, posts, and haven't found anything explicit for so far. |
|
Administrator
|
Salut,
Using Broadcaster#broadcast(events, AtmosphereResource) http://is.gd/gS5Av is what you want to use, e.g you broadcast a message to a single user. You can also do Broadcaster#broadcast(events, Set<AtmosphereResource>) http://is.gd/gS5Df to broadcast to a subset of user. Does that help? -- Jeanfrancois |
|
OK, I thought I'd have to use a resume instead of a broadcast since it was to continue a response (or 2 responses)...
But what I have more difficulties is to see how to use all this with the client side... I'd like to be able to use any comet protocol or websocket...
Do I have to use jQuery and Meteor ? While reading all this, I thought I'd have to use Meteor, have my own servlet that would suspend all incoming get's and that would resume (or broadcast based on your reply) the response to the user for who I receive an update from another request. Those requests come to the servlet from another server application, pushing updates for a precise logged in user.
I even thought one moment to use the pub/sub, based on a channel per login... but it seems maybe a little bit overkill... Would you have any pointer / direction to send me ?
Thx in advance, Mike 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email]> Salut, |
|
Salut,
On 10-11-10 3:59 AM, mikeD wrote: > OK, I thought I'd have to use a resume instead of a broadcast since it > was to continue a response (or 2 responses)... Can you elaborate a little bit? Broadcast will send an message to the remote client. You can resume on broadcast (long polling) or leave the connection open (http-streaming, websocket). > > But what I have more difficulties is to see how to use all this with the > client side... I'd like to be able to use any comet protocol or websocket... Take a look at the Atmosphere JQuery Plugin: http://is.gd/gTVYN The sample jquery-* all support all transport. http://is.gd/frznC > > Do I have to use jQuery and Meteor ? The same way described in the above blog. Take a look at the Wicket sample, which use Meteor to support Comet and Websocket with JQuery Plugin http://is.gd/fOTK6 You can download the sample using the link above. While reading all this, I thought > I'd have to use Meteor, have my own servlet that would suspend all > incoming get's and that would resume (or broadcast based on your reply) > the response to the user for who I receive an update from another > request. OK if I understand properly, you don't want to broadcast events to all users (browser), but instead you want to always broadcast to a single user: > Meteor m = Meteor.build(req); > // Set a Broadcaster per Meteor. Use the servlet-path for ID > m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath()); > > m.suspend(); Then when you are ready to broadcast event: > m.broadcast("message").resume(); Those requests come to the servlet from another server > application, pushing updates for a precise logged in user. > > I even thought one moment to use the pub/sub, based on a channel per > login... but it seems maybe a little bit overkill... > > Would you have any pointer / direction to send me ? Let me know if that help! -- Jeanfrancois > > Thx in advance, > > Mike > > 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email] > </user/SendEmail.jtp?type=node&node=5723935&i=0>> > > Salut, > > Using Broadcaster#broadcast(events, AtmosphereResource) > > http://is.gd/gS5Av > > is what you want to use, e.g you broadcast a message to a single > user. You can also do Broadcaster#broadcast(events, > Set<AtmosphereResource>) > > http://is.gd/gS5Df > > to broadcast to a subset of user. > > Does that help? > > -- Jeanfrancois > > ------------------------------------------------------------------------ > View message @ > http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t> > > To unsubscribe from Finding my way - push to individual users, click > here > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>. > > > > > ------------------------------------------------------------------------ > View this message in context: Re: Finding my way - push to individual > users > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html> > Sent from the Atmosphere users mailling list mailing list archive > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at > Nabble.com. |
|
Hi,
I start to understand the mechanism now, while reading all samples... when you say that // Set a Broadcaster per Meteor. Use the servlet-path for ID m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath()); Maybe i'm wrong but i think this will create an instance of Broadcaster per servlet path and then set it on the Meteor... ending in having a broadcaster instance for the servlet path, which is the same for every user... wouldn't it have to be created using the username as Id ? Or maybe you assume that the servlet path is unique per user, meaning that I would have something like /MessageHandler/jfarcand for you and /MessageHandler/mike for me.... is the latest what you assume ?
Mike
2010/11/10 jfarcand <[hidden email]> Salut, |
|
In reply to this post by Jeanfrancois Arcand-4
Hello,
goign further with code... I try to have a webpage in which the logged in user will receive messages sent by the system (various applications). Only the targeted user should receive the messages, they are personnal.
I wrote a web page (html), setup a webapp with 2 servlets : - Meteor managing my first servlet (ContextPublisher) - my second servlet (BroadcastServlet). The first one is used by the page to get the messages
The second one is called by other apps (myself with another browser for the moment) to send messages (Strings) to a given user. I see in the server logs that when I open the webpage in a browser, it does a GET on the first servlet and is suspended.
I see in the logs that when I call the second servlet, a broadcast will be issued to the right user-related Broadcasters I don't see the onBroadcast fired. The webpage never receive the message.
Based on this, I assume this is not the right way to broadcast, but what is wrong ? In attachment, the code for the page and the 2 servlets. Thanks in advance for your help !
2010/11/10 jfarcand-3 [via Atmosphere users mailling list] <[hidden email]>
|
|
In reply to this post by mikeD
Salut,
On 10-11-16 2:50 AM, Michael Dewitte wrote: > Hi, > > I start to understand the mechanism now, while reading all samples... > > when you say that > // Set a Broadcaster per Meteor. Use the servlet-path for ID > m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, > req.getServletPath()); > > Maybe i'm wrong but i think this will create an instance of Broadcaster > per servlet path and then set it on the Meteor... ending in having a > broadcaster instance for the servlet path, which is the same for every > user... Yes, for the clock sample I wanted to use a single Broadcaster/ wouldn't it have to be created using the username as Id ? You could, but that means only that user will received events when you invoke Broadcaster.broadcast. No events will be shared. Or > maybe you assume that the servlet path is unique per user, meaning that > I would have something like /MessageHandler/jfarcand for you and > /MessageHandler/mike for me.... is the latest what you assume ? It really an application decision. If you write a chat, you will create a Meteor per chat room. In the sample I didn't needed that. Thanks! -- jeanfrancois > > Mike > > 2010/11/10 jfarcand <[hidden email] <mailto:[hidden email]>> > > Salut, > > > On 10-11-10 3:59 AM, mikeD wrote: > > OK, I thought I'd have to use a resume instead of a broadcast > since it > was to continue a response (or 2 responses)... > > > Can you elaborate a little bit? Broadcast will send an message to > the remote client. You can resume on broadcast (long polling) or > leave the connection open (http-streaming, websocket). > > > > > But what I have more difficulties is to see how to use all this > with the > client side... I'd like to be able to use any comet protocol or > websocket... > > > Take a look at the Atmosphere JQuery Plugin: > > http://is.gd/gTVYN > > The sample jquery-* all support all transport. > > http://is.gd/frznC > > > > Do I have to use jQuery and Meteor ? > > > The same way described in the above blog. Take a look at the Wicket > sample, which use Meteor to support Comet and Websocket with JQuery > Plugin > > http://is.gd/fOTK6 > > You can download the sample using the link above. > > > > While reading all this, I thought > > I'd have to use Meteor, have my own servlet that would suspend all > incoming get's and that would resume (or broadcast based on your > reply) > the response to the user for who I receive an update from another > request. > > > OK if I understand properly, you don't want to broadcast events to > all users (browser), but instead you want to always broadcast to a > single user: > > Meteor m = Meteor.build(req); > // Set a Broadcaster per Meteor. Use the servlet-path for ID > m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, > req.getServletPath()); > > m.suspend(); > > > Then when you are ready to broadcast event: > > m.broadcast("message").resume(); > > > > Those requests come to the servlet from another server > > application, pushing updates for a precise logged in user. > > I even thought one moment to use the pub/sub, based on a channel per > login... but it seems maybe a little bit overkill... > > Would you have any pointer / direction to send me ? > > > Let me know if that help! > > -- Jeanfrancois > > > > Thx in advance, > > Mike > > 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden > email] > </user/SendEmail.jtp?type=node&node=5723935&i=0>> > > > Salut, > > Using Broadcaster#broadcast(events, AtmosphereResource) > > http://is.gd/gS5Av > > is what you want to use, e.g you broadcast a message to a single > user. You can also do Broadcaster#broadcast(events, > Set<AtmosphereResource>) > > http://is.gd/gS5Df > > to broadcast to a subset of user. > > Does that help? > > -- Jeanfrancois > > > ------------------------------------------------------------------------ > View message @ > http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t> > > > To unsubscribe from Finding my way - push to individual > users, click > here > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>>. > > > > > ------------------------------------------------------------------------ > View this message in context: Re: Finding my way - push to > individual > users > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html> > > Sent from the Atmosphere users mailling list mailing list archive > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at > Nabble.com. > > |
|
In reply to this post by mikeD
Salut,
On 10-11-16 8:38 AM, mikeD wrote: > Hello, > > goign further with code... I try to have a webpage in which the logged > in user will receive messages sent by the system (various applications). > Only the targeted user should receive the messages, they are personnal. > > I wrote a web page (html), setup a webapp with 2 servlets : > - Meteor managing my first servlet (ContextPublisher) > - my second servlet (BroadcastServlet). > > The first one is used by the page to get the messages > The second one is called by other apps (myself with another browser for > the moment) to send messages (Strings) to a given user. > > I see in the server logs that when I open the webpage in a browser, it > does a GET on the first servlet and is suspended. > I see in the logs that when I call the second servlet, a broadcast will > be issued to the right user-related Broadcasters > > I don't see the onBroadcast fired. > The webpage never receive the message. I suspect the broiadcaster used in second Servlet is not the proper one.Can you send me your code / war at jfarcand [at] apache [dot] org? > > Based on this, I assume this is not the right way to broadcast, but what > is wrong ? Right. > > In attachment, the code for the page and the 2 servlets. > > Thanks in advance for your help ! You are welcome. A+ --Jeanfrancois > > > 2010/11/10 jfarcand-3 [via Atmosphere users mailling list] <[hidden > email] </user/SendEmail.jtp?type=node&node=5743825&i=0>> > > Salut, > > On 10-11-10 3:59 AM, mikeD wrote: > > OK, I thought I'd have to use a resume instead of a broadcast > since it > > was to continue a response (or 2 responses)... > > Can you elaborate a little bit? Broadcast will send an message to the > remote client. You can resume on broadcast (long polling) or leave the > connection open (http-streaming, websocket). > > > > > > But what I have more difficulties is to see how to use all this > with the > > client side... I'd like to be able to use any comet protocol or > websocket... > > Take a look at the Atmosphere JQuery Plugin: > > http://is.gd/gTVYN > > The sample jquery-* all support all transport. > > http://is.gd/frznC > > > > > Do I have to use jQuery and Meteor ? > > The same way described in the above blog. Take a look at the Wicket > sample, which use Meteor to support Comet and Websocket with JQuery > Plugin > > http://is.gd/fOTK6 > > You can download the sample using the link above. > > > While reading all this, I thought > > I'd have to use Meteor, have my own servlet that would suspend all > > incoming get's and that would resume (or broadcast based on your > reply) > > the response to the user for who I receive an update from another > > request. > > OK if I understand properly, you don't want to broadcast events to all > users (browser), but instead you want to always broadcast to a > single user: > > > Meteor m = Meteor.build(req); > > // Set a Broadcaster per Meteor. Use the servlet-path for ID > > > m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, > req.getServletPath()); > > > > m.suspend(); > > Then when you are ready to broadcast event: > > > m.broadcast("message").resume(); > > > Those requests come to the servlet from another server > > application, pushing updates for a precise logged in user. > > > > I even thought one moment to use the pub/sub, based on a channel per > > login... but it seems maybe a little bit overkill... > > > > Would you have any pointer / direction to send me ? > > Let me know if that help! > > -- Jeanfrancois > > > > > > Thx in advance, > > > > Mike > > > > 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden > email] > > </user/SendEmail.jtp?type=node&node=5723935&i=0>> > > > > Salut, > > > > > Using Broadcaster#broadcast(events, AtmosphereResource) > > > > http://is.gd/gS5Av > > > > is what you want to use, e.g you broadcast a message to a single > > user. You can also do Broadcaster#broadcast(events, > > Set<AtmosphereResource>) > > > > http://is.gd/gS5Df > > > > to broadcast to a subset of user. > > > > Does that help? > > > > -- Jeanfrancois > > > > > ------------------------------------------------------------------------ > > > View message @ > > > http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t&by-user=t> > > > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t&by-user=t&by-user=t>> > > > > > To unsubscribe from Finding my way - push to individual > users, click > > here > > > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t&by-user=t&by-user=t>>. > > > > > > > > > > > > ------------------------------------------------------------------------ > > > View this message in context: Re: Finding my way - push to > individual > > users > > > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html?by-user=t&by-user=t>> > > > Sent from the Atmosphere users mailling list mailing list archive > > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/ > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/?by-user=t&by-user=t>> > at > > Nabble.com. > > > ------------------------------------------------------------------------ > View message @ > http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5724753.html > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5724753.html?by-user=t> > > To unsubscribe from Finding my way - push to individual users, click > here > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>. > > > > > Message > > > No message yet > > > [BroadcastServlet.java] > ------------------------------------------------------------------------ > package be.forem.context.controller; > > import java.io.IOException; > import java.util.Enumeration; > import java.util.concurrent.Future; > > import javax.servlet.ServletException; > import javax.servlet.http.HttpServlet; > import javax.servlet.http.HttpServletRequest; > import javax.servlet.http.HttpServletResponse; > > import org.atmosphere.cpr.BroadcasterFactory; > import org.atmosphere.cpr.DefaultBroadcaster; > > > > public class BroadcastServlet extends HttpServlet { > private static final long serialVersionUID = -298281051064936872L; > > /** > * @see HttpServlet#doPut(HttpServletRequest, HttpServletResponse) > */ > protected void doGet(HttpServletRequest request, HttpServletResponse > response) throws ServletException, IOException { > String username = request.getParameter("username"); > String message = request.getParameter("message"); > String broadcasterId = "/ContextPublisher/"+username; > System.out.println("Broadcasting ("+message+") to : "+broadcasterId); > > try { > BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, > broadcasterId).broadcast("<script > type='text/javascript'>parent.setMsg('" + message + "')</script>\n"); > } catch (IllegalAccessException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (InstantiationException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > } > > } > > ------------------------------------------------------------------------ > > [ContextPublisher.java] > ------------------------------------------------------------------------ > package be.forem.context.controller; > > import java.io.IOException; > import java.util.Enumeration; > > import javax.servlet.ServletException; > import javax.servlet.http.HttpServlet; > import javax.servlet.http.HttpServletRequest; > import javax.servlet.http.HttpServletResponse; > > import org.atmosphere.cpr.AtmosphereResourceEvent; > import org.atmosphere.cpr.AtmosphereResourceEventListener; > import org.atmosphere.cpr.BroadcasterFactory; > import org.atmosphere.cpr.DefaultBroadcaster; > import org.atmosphere.cpr.Meteor; > > > public class ContextPublisher extends HttpServlet implements > AtmosphereResourceEventListener{ > > private static final long serialVersionUID = 1393519886718770628L; > > protected void doGet(HttpServletRequest request, HttpServletResponse > response) throws ServletException, IOException { > Meteor m = Meteor.build(request); > > try { > //create an id for the broadcaster, based on the username > String broadcasterId=request.getServletPath()+request.getPathInfo(); > System.out.println("Suspending response for broadcaster : " > +broadcasterId); > > m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, > broadcasterId)); > m.addListener(this); > request.getSession().setAttribute("meteor", m); > m.suspend(-1); > } catch (IllegalAccessException e) { > e.printStackTrace(); > } catch (InstantiationException e) { > e.printStackTrace(); > } > > > } > > public void onBroadcast(AtmosphereResourceEvent > <HttpServletRequest, HttpServletResponse> event) { > System.out.println("onBroadcast"); > > Meteor meteor = Meteor.lookup(event.getResource().getRequest()); > > meteor.removeListener(this); > meteor.resume(); > > } > > public void onDisconnect( > AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) { > // TODO Auto-generated method stub > System.out.println("onDisconnect"); > > } > > public void onResume( > AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) { > // TODO Auto-generated method stub > System.out.println("onResume"); > > } > > public void onSuspend( > AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) { > // TODO Auto-generated method stub > System.out.println("onSuspend"); > > } > > } > > ------------------------------------------------------------------------ > > ------------------------------------------------------------------------ > View this message in context: Re: Finding my way - push to individual > users > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5743825.html> > Sent from the Atmosphere users mailling list mailing list archive > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at > Nabble.com. |
|
For those who are also interested in this functionnality (I received a mail asking if I succeeded because someone was also trying to have kind of similar functionnality), I finally changed the strategy and ended with something working :
Finally, I ended with a working version as a POC. I based my work on the pubsub with jersey sample. I had to correct the fact it was with servlet 3. I had to correct the servlet mapping to be able to serve the html page. Then it worked. At this point, I added another servlet whose purpose was only to receive a get from any other browser, with as parameter the desired topic to publish on and a message. In this servlet, there's only one line of code : BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, request.getParameter("username")).broadcast(request.getParameter("message")); And that's it ! This shows I'm able to send a broadcast to every browser a user has open with some iframe subscribing to the topic (equals his username) based on the code of the sample, just by using a call to the broadcastservlet I wrote or even just using the line of code above... which allows even to use jms to queue messages, or even start from a pojo to send the broadcast... I don't know, though, how it will work in a cluster... i'll have to check in a near future... Hope it helps a bit... |
|
Salut,
Thanks for sharing...right now I'm swamped at work and I miss time to give better help here. On 10-11-30 7:11 AM, mikeD wrote: > > For those who are also interested in this functionnality (I received a mail > asking if I succeeded because someone was also trying to have kind of > similar functionnality), I finally changed the strategy and ended with > something working : > > Finally, I ended with a working version as a POC. I based my work on the > pubsub with jersey sample. I had to correct the fact it was with servlet 3. > I had to correct the servlet mapping to be able to serve the html page. Then > it worked. > > At this point, I added another servlet whose purpose was only to receive a > get from any other browser, with as parameter the desired topic to publish > on and a message. In this servlet, there's only one line of code : > > BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, > request.getParameter("username")).broadcast(request.getParameter("message")); > > And that's it ! This shows I'm able to send a broadcast to every browser a > user has open with some iframe subscribing to the topic (equals his > username) based on the code of the sample, just by using a call to the > broadcastservlet I wrote or even just using the line of code above... which > allows even to use jms to queue messages, or even start from a pojo to send > the broadcast... > > I don't know, though, how it will work in a cluster... i'll have to check in > a near future... Take a look at: * http://is.gd/hZsTM * http://is.gd/hZsXn You can easily cluster using Redis, JGroups, JMS or XMPP by either choosing the proper BroadcastFilter or Broadcaster. A+ - Jeanfrancois > > Hope it helps a bit... |
|
In reply to this post by mikeD
Oh, I see... you are using Atmosphere 'Broadcaster' to achieve messaging in a single JVM/Classloader scope.
So Atmosphere is really doing two jobs: a) a CPR to map Comet to various web servers, and B) replacing a JMS or other messaging system. I assume (B) is a pragmatic development because the first/obvious demo of Comet is for a Chat app. But one could alternatively use JMS? In that case, each web-user is associated with a 'Broadcaster' that is listening to a JMS topic or queue. (and in this case of 'push to individual users', the topic/queue is named by the username) Then, when a message is to be delivered to the user, one uses the username to lookup the JMS Destination and send to that topic/queue, which triggers the Listener, which delivers the message to Atmosphere (using AbstractBroadcastProxy.broadcastReceivedMessage(message) as in the JMSBroadcaster) The advantage here is that the *sender* of the message does not need to know about Atmosphere, and if you must span multiple servers/JVMs, you use the usual JMS/JNDI mechanisms. Only the endpoint to the web-user needs to be linked to Atmosphere. Is this a reasonable approach? |
|
Got it working with individual "Broadcaster" per user/session.
A few mods required: 0. minor: pingForStats() does not work when build/run from Eclipse, ${version} is not expanded. 1. AtmosphereServlet did not honor setting broadcasterClass in web.xml Fix: move the invocation of configureDefaultBroadcasterFactory() from the constructor and put it in init() *after* doInitParams(scFacade); [else the DefaultBroadcasterFactory is build using DefaultBroadcaster versus the given broadcasterClass] 3. There was a problem with [Chat]AtmosphereHandler writing to the response/resource. in AtmosphereResourceImpl.write(), getOutputStream() may be used, but later, AtmosphereHandler just uses getWriter() and thereupon fails. So, I added AtmosphereResource.write(String message) throws IOException, and implemented that with the existing AtmosphereResource.write() by adding the parameter, moving the try/catch block up to: if (flushComment) { try { write(beginCompatibleData); } catch (IOException ex) { loggger... } } And then replace res.getWriter().write(...)/flush() with resource.write(script); Now getWrite()/getOutputStream() are used consistently. 4. AtmosphereHandler.onRequest(resource) is invoked with the singleton 'resource' associated with the servlet's name; while that may be useful as a pan-application 'topic', we need more. The essential trick is to create and register a Broadcaster for each username/session. The new Broadcaster must be associated with the AtmosphereResourceImpl. The obvious way to do that was copy the code from AsynchronousProcessor: AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, broadcaster, req, res, this); req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re); That requires access to the appropriate CometSupport, so I added getCommetSupport() to the AtmosphereResource interface (and AtmosphereResourceImpl). This allows my AtmosphereHandler.onRequest(resource) to: cometSupport = resource.getCometSupport(); config = resource.getAtmosphereConfig(); AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, bc, req, res, cometSupport); req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re); Depending on the application requirements, one may cache that Broadcaster as necessary. [for me, I put in in the Session; if you have multiple session's per user, you may need to put it elsewhere, and attach each session/request to that Broadcaster] 5. The actual Broadcaster that I use creates a JMSMessageListener, the onMessage() of which extracts the message, and does broadcastReceiveMessage(message); The web application uses JMS to send messages to the queue/topic. Each user is listening to a dynamic queue named by the username (same as the Broadcaster name/id) and may listen to other topics as well. (TBD) This approach simplifies the usage of Atmosphere, as Atmosphere is now used only to supply the CometSupport to various protocols and app servers. And horizontal messaging is handled by JMS. Service 'RPC' requests from Http are managed by normal Spring @Controller dispatch, The AtmosphereServlet is just there to do async events to the client. Using Spring jmsTemplate, we just extract the destination from the REST url, (validate and secure, and find the senders username) and then jmsTemplate.convertAndSend(message); Presto: it comes out the onMessage() and is pushed to the client(s)! |
|
In reply to this post by jackparker
Salut,
On 10-12-03 4:52 PM, jackparker wrote: > > Oh, I see... you are using Atmosphere 'Broadcaster' to achieve messaging in a > single JVM/Classloader scope. > > So Atmosphere is really doing two jobs: a) a CPR to map Comet to various web > servers, > and B) replacing a JMS or other messaging system. Not exactly. Atmosphere has a Broadcaster concept that can be implemented as a JMS queue/topic. > > I assume (B) is a pragmatic development because the first/obvious demo of > Comet is for a Chat app. > But one could alternatively use JMS? > In that case, each web-user is associated with a 'Broadcaster' that is > listening to a JMS topic or queue. > (and in this case of 'push to individual users', the topic/queue is named by > the username) > > Then, when a message is to be delivered to the user, one uses the username > to lookup the JMS Destination > and send to that topic/queue, which triggers the Listener, which delivers > the message to Atmosphere > (using AbstractBroadcastProxy.broadcastReceivedMessage(message) as in the > JMSBroadcaster) > > The advantage here is that the *sender* of the message does not need to know > about Atmosphere, > and if you must span multiple servers/JVMs, you use the usual JMS/JNDI > mechanisms. > Only the endpoint to the web-user needs to be linked to Atmosphere. > > Is this a reasonable approach? Yes it is. Currently we do support: + JMS + Cluster + Redis + XMPP (those will ship in 0.7). They have respective Broadcaster implementation that can be used to exactly achieve what you described above. A+ - Jeanfrancois > > > |
|
In reply to this post by jackparker
Salut,
first, many thanks for the feedback. On 10-12-06 1:38 AM, jackparker wrote: > > Got it working with individual "Broadcaster" per user/session. > > A few mods required: > 0. minor: pingForStats() does not work when build/run from Eclipse, > ${version} is not expanded. Hum...really strange. I don't use Eclipse. Can you paste the log. > > 1. AtmosphereServlet did not honor setting broadcasterClass in web.xml > Fix: move the invocation of configureDefaultBroadcasterFactory() from the > constructor > and put it in init() *after* doInitParams(scFacade); > [else the DefaultBroadcasterFactory is build using DefaultBroadcaster > versus the given broadcasterClass] Let me check that. This is a regression for sure. > > 3. There was a problem with [Chat]AtmosphereHandler writing to the > response/resource. > in AtmosphereResourceImpl.write(), getOutputStream() may be used, > but later, AtmosphereHandler just uses getWriter() and thereupon fails. > So, I added AtmosphereResource.write(String message) throws IOException, > and implemented that with the existing AtmosphereResource.write() > by adding the parameter, moving the try/catch block up to: > if (flushComment) { > try { > write(beginCompatibleData); > } catch (IOException ex) { loggger... } > } > And then replace res.getWriter().write(...)/flush() with > resource.write(script); > Now getWrite()/getOutputStream() are used consistently. Can you paste the diff here? I'm not aware of any problems like that. There is also a property called: org.atmosphere.useStream that can be set to force application to only use stream or writer. > > > 4. AtmosphereHandler.onRequest(resource) is invoked with the singleton > 'resource' associated > with the servlet's name; while that may be useful as a pan-application > 'topic', we need more. > The essential trick is to create and register a Broadcaster for each > username/session. > > The new Broadcaster must be associated with the AtmosphereResourceImpl. > The obvious way to do that was copy the code from AsynchronousProcessor: > AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, > broadcaster, req, res, this); > req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re); > > That requires access to the appropriate CometSupport, > so I added getCommetSupport() to the AtmosphereResource interface (and > AtmosphereResourceImpl). > This allows my AtmosphereHandler.onRequest(resource) to: > cometSupport = resource.getCometSupport(); > config = resource.getAtmosphereConfig(); > AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, bc, req, > res, cometSupport); > req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re); Why not doing AtmosphereResourceImpl.setBroadcaster(...) directly instead of creating at AtmosphereResourceImpl? > Depending on the application requirements, one may cache that Broadcaster > as necessary. > [for me, I put in in the Session; if you have multiple session's per > user, > you may need to put it elsewhere, and attach each session/request to > that Broadcaster] Did you see the BroadcasterFactory? That API can also help when it is time to cache Broadcaster: http://is.gd/ihYb4 Again, share you diff :-) I'm not against the change, but I need to see it to make sure there is no regression with previous version. > > 5. The actual Broadcaster that I use creates a JMSMessageListener, the > onMessage() of which > extracts the message, and does broadcastReceiveMessage(message); > The web application uses JMS to send messages to the queue/topic. > Each user is listening to a dynamic queue named by the username (same as > the Broadcaster name/id) > and may listen to other topics as well. (TBD) > > This approach simplifies the usage of Atmosphere, as Atmosphere is now > used only > to supply the CometSupport to various protocols and app servers. > And horizontal messaging is handled by JMS. > Service 'RPC' requests from Http are managed by normal Spring @Controller > dispatch, > The AtmosphereServlet is just there to do async events to the client. > Using Spring jmsTemplate, we just extract the destination from the REST > url, > (validate and secure, and find the senders username) > and then jmsTemplate.convertAndSend(message); > Presto: it comes out the onMessage() and is pushed to the client(s)! This is great! Again, if you can share your changes so I can see if I can integrate them. Thanks! -- Jeanfrancois > |
|
Jeanfrancois,
Thanks for the informative response. Indeed, using setBroadcaster() on the existing Resource works! And if I do my own BroadcasterFactory, that would obviate the need for that. [that is: the BroadcasterFactory should inject/create the correct Broadcaster *before* giving it to the AtmosphereHander] 0: I have excluded atmosphere-ping so the problem is gone, but the essence was that because an illegal character in the URL at Line-66: URI.create(String.format("<a href="http://jfarcand.wordpress.com/ping-atmosphere-%s">http://jfarcand.wordpress.com/ping-atmosphere-%s", version)).toURL() because version was something like "${pom.version}" That is: the property file had not been filter/interpolated. This was only a problem because Eclipse running tomcat in debug mode wants to 'debug' the Exception that is thrown, and so would block. I was tempted to catch and handle better but just excluded atmosphere-ping.jar The problem with getWrite()/getOutputStream() is in the old version of ChatAtmosphereHandler, where it is hard-coded to use res.getWriter().write(...)/flush(); The only code I could find that checks PROPERTY_USE_STREAM was the write() method in AtmosphereResourceImpl So I hacked that to make it possible for the "Application" to use the same code to output the real data, as is used by the framework to send the flushComment/beginCompatibleData. :atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java index 6a9abd7..5200a01 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java @@ -174,6 +174,15 @@ public interface AtmosphereResource<E, F> { public void write(OutputStream os, Object o) throws IOException; /** + * Write the {@link String} using the {@link OutputStream} or {@link ServletWriter} + * depending on the setting of org.atmosphere.useStream from web.xml init-params. + * + * @param str {@link String} + * @throws java.io.IOException + */ + public void write(String str) throws IOException; + + /** * Get the {@link Serializer} or null if not defined. * * @return the {@link Serializer} or null if not defined. :atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java index 5e97729..b6fd700 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java @@ -201,7 +201,11 @@ public class AtmosphereResourceImpl implements } if (flushComment) { - write(); + try { + write(beginCompatibleData); + } catch (Throwable ex) { + LoggerUtils.getLogger().log(Level.WARNING, "", ex); + } } req.setAttribute(PRE_SUSPEND, "true"); action.type = AtmosphereServlet.Action.TYPE.SUSPEND; @@ -224,8 +228,13 @@ public class AtmosphereResourceImpl implements } } - void write() { - try { + /** + * write to this Response, using getWriter() or getOutputStream as indicated by useWriter + * or property PROPERTY_USE_STREAM=org.atmosphere.useStream; + */ + @Override + public void write(String message) throws IOException { +// try { if (useWriter && !((Boolean) req.getAttribute(AtmosphereServlet.PROPERTY_USE_STREAM))) { try { res.getWriter(); @@ -233,7 +242,7 @@ public class AtmosphereResourceImpl implements return; } - res.getWriter().write(beginCompatibleData); + res.getWriter().write(message); res.getWriter().flush(); } else { try { @@ -246,9 +255,9 @@ public class AtmosphereResourceImpl implements res.getOutputStream().flush(); } - } catch (Throwable ex) { - LoggerUtils.getLogger().log(Level.WARNING, "", ex); - } +// } catch (Throwable ex) { +// LoggerUtils.getLogger().log(Level.WARNING, "", ex); +// } } ----------------------------- Here is the diff for configureDefaultBroadcasterFactory for 0.7-SNAPSHOT of ~ 24/Nov/2010. [blobs 1 & 3 are the actual patch, blobs 2 & 4 are just to make stepping in the debugger easier.] :atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java index 80ee851..0c2989e --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java @@ -396,7 +396,6 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc public AtmosphereServlet(boolean isFilter) { this.isFilter = isFilter; readSystemProperties(); - configureDefaultBroadcasterFactory(); populateBroadcasterType(); } @@ -412,7 +411,9 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc } catch (ClassNotFoundException e) { LoggerUtils.getLogger().log(Level.SEVERE,"",e); } - BroadcasterFactory.setBroadcasterFactory(new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b), config); + Class bc = (b == null ? DefaultBroadcaster.class : b); + logger.log(Level.INFO, "configureDefaultBroadcasterFactor using class: "+bc); + BroadcasterFactory.setBroadcasterFactory(new DefaultBroadcasterFactory(bc), config); } @@ -563,6 +564,7 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc }; pingForStats(); doInitParams(scFacade); + configureDefaultBroadcasterFactory(); detectGoogleAppEngine(scFacade); loadConfiguration(scFacade); @@ -583,8 +585,8 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc protected void configureBroadcaster() throws ClassNotFoundException, InstantiationException, IllegalAccessException { if (broadcasterFactory == null) { - broadcasterFactory = new DefaultBroadcasterFactory((Class<Broadcaster>) - Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName)); + Class bc = (Class<Broadcaster>) Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName); + broadcasterFactory = new DefaultBroadcasterFactory(bc); config.broadcasterFactory = broadcasterFactory; } When I get my BroadcasterFactory working, and extends my JMSBroadcaster to include multiple topic/queue i'll send that; it may be a few weeks :) |
|
Salut,
On 10-12-06 1:52 PM, jackparker wrote: > > Jeanfrancois, > Thanks for the informative response. > Indeed, using setBroadcaster() on the existing Resource works! > And if I do my own BroadcasterFactory, that would obviate the need for that. > [that is: the BroadcasterFactory should inject/create the correct > Broadcaster *before* giving it to the AtmosphereHander] Currently I don't think you can do that unless I add some Filter concept. > > 0: I have excluded atmosphere-ping so the problem is gone, but the essence > was that because > an illegal character in the URL at Line-66: > URI.create(String.format("<a href="http://jfarcand.wordpress.com/ping-atmosphere-%s">http://jfarcand.wordpress.com/ping-atmosphere-%s", > version)).toURL() > because version was something like "${pom.version}" > That is: the property file had not been filter/interpolated. > This was only a problem because Eclipse running tomcat in debug mode wants > to 'debug' the Exception that is thrown, and so would block. > I was tempted to catch and handle better but just excluded > atmosphere-ping.jar I see. Strange Eclipse. > > The problem with getWrite()/getOutputStream() is in the old version of > ChatAtmosphereHandler, > where it is hard-coded to use res.getWriter().write(...)/flush(); > The only code I could find that checks PROPERTY_USE_STREAM was the write() > method in AtmosphereResourceImpl > So I hacked that to make it possible for the "Application" to use the same > code to output the real data, > as is used by the framework to send the flushComment/beginCompatibleData. > > :atmosphere>git diff > modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java > diff --git > a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java > b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java > index 6a9abd7..5200a01 > --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java > +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java > @@ -174,6 +174,15 @@ public interface AtmosphereResource<E, F> { > public void write(OutputStream os, Object o) throws IOException; > > /** > + * Write the {@link String} using the {@link OutputStream} or {@link > ServletWriter} > + * depending on the setting of org.atmosphere.useStream from web.xml > init-params. > + * > + * @param str {@link String} > + * @throws java.io.IOException > + */ > + public void write(String str) throws IOException; > + > + /** > * Get the {@link Serializer} or null if not defined. > * > * @return the {@link Serializer} or null if not defined. > > :atmosphere>git diff > modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java > diff --git > a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java > b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java > index 5e97729..b6fd700 > --- > a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java > +++ > b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java > @@ -201,7 +201,11 @@ public class AtmosphereResourceImpl implements > } > > if (flushComment) { > - write(); > + try { > + write(beginCompatibleData); > + } catch (Throwable ex) { > + LoggerUtils.getLogger().log(Level.WARNING, "", ex); > + } > } > req.setAttribute(PRE_SUSPEND, "true"); > action.type = AtmosphereServlet.Action.TYPE.SUSPEND; > @@ -224,8 +228,13 @@ public class AtmosphereResourceImpl implements > } > } > > - void write() { > - try { > + /** > + * write to this Response, using getWriter() or getOutputStream as > indicated by useWriter > + * or property PROPERTY_USE_STREAM=org.atmosphere.useStream; > + */ > + @Override > + public void write(String message) throws IOException { > +// try { > if (useWriter&& !((Boolean) > req.getAttribute(AtmosphereServlet.PROPERTY_USE_STREAM))) { > try { > res.getWriter(); > @@ -233,7 +242,7 @@ public class AtmosphereResourceImpl implements > return; > } > > - res.getWriter().write(beginCompatibleData); > + res.getWriter().write(message); > res.getWriter().flush(); > } else { > try { > @@ -246,9 +255,9 @@ public class AtmosphereResourceImpl implements > res.getOutputStream().flush(); > } > > - } catch (Throwable ex) { > - LoggerUtils.getLogger().log(Level.WARNING, "", ex); > - } > +// } catch (Throwable ex) { > +// LoggerUtils.getLogger().log(Level.WARNING, "", ex); > +// } > } I see. Let me think of it, but I'm not against your idea. > > > ----------------------------- > Here is the diff for configureDefaultBroadcasterFactory for 0.7-SNAPSHOT of > ~ 24/Nov/2010. > [blobs 1& 3 are the actual patch, blobs 2& 4 are just to make stepping in > the debugger easier.] > > :atmosphere>git diff > modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java > diff --git > a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java > b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java > index 80ee851..0c2989e > --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java > +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java > @@ -396,7 +396,6 @@ public class AtmosphereServlet extends > AbstractAsyncServlet implements CometProc > public AtmosphereServlet(boolean isFilter) { > this.isFilter = isFilter; > readSystemProperties(); > - configureDefaultBroadcasterFactory(); > populateBroadcasterType(); > } > > @@ -412,7 +411,9 @@ public class AtmosphereServlet extends > AbstractAsyncServlet implements CometProc > } catch (ClassNotFoundException e) { > LoggerUtils.getLogger().log(Level.SEVERE,"",e); > } > - BroadcasterFactory.setBroadcasterFactory(new > DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b), > config); > + Class bc = (b == null ? DefaultBroadcaster.class : b); > + logger.log(Level.INFO, "configureDefaultBroadcasterFactor using > class: "+bc); > + BroadcasterFactory.setBroadcasterFactory(new > DefaultBroadcasterFactory(bc), config); > } > > > @@ -563,6 +564,7 @@ public class AtmosphereServlet extends > AbstractAsyncServlet implements CometProc > }; > pingForStats(); > doInitParams(scFacade); > + configureDefaultBroadcasterFactory(); > detectGoogleAppEngine(scFacade); > loadConfiguration(scFacade); > > @@ -583,8 +585,8 @@ public class AtmosphereServlet extends > AbstractAsyncServlet implements CometProc > protected void configureBroadcaster() throws ClassNotFoundException, > InstantiationException, IllegalAccessException { > > if (broadcasterFactory == null) { > - broadcasterFactory = new > DefaultBroadcasterFactory((Class<Broadcaster>) > - > Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName)); > + Class bc = (Class<Broadcaster>) > Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName); > + broadcasterFactory = new DefaultBroadcasterFactory(bc); > config.broadcasterFactory = broadcasterFactory; > } > > When I get my BroadcasterFactory working, and extends my JMSBroadcaster to > include multiple topic/queue > i'll send that; it may be a few weeks :) Thanks a lot for the feedback. I will apply the patch above after I've ran all the tests. Thanks! -- Jeanfrancois > |
Agreed. would need some pluggable strategy to replace the map(servletPath) However, this is not not really needed. Now that I see what the DefaultBroadcasterFactory does, it took just a few hacks to my constructor/setID (to ignore the initial "default" id) and now (with thanks to Mike, the OP, for his 'one-liner') this works in MyAtomspherHander.onRequest: // resource.getBroadcaster() = map("/{servletPath}"); but we want the per-user Broadcaster String path = req.getServletPath(); String bcid = path+"/"+session.getUsername(); // also becomes the JMS Queue name BroadcasterFactory bf = BroadcasterFactory.getDefault(); Broadcaster bc = bf.lookup(MyJMSBroadcaster.class, bcid, true); // get or create/register MyJMSBroadcaster(bcid); bc.getBroadcasterConfig().addFilter(xssHtmlFilter); resource.setBroadcaster(bc); // add Broadcaster to resource resource.suspend(); // add resource to (Broadcaster)bc! As Mike said, all sessions/requests/Resources with the same username a) find the same Broadcaster, and B) are in the 'resources' list of that same Broadcaster So if any message comes to that Broadcaster, all sessions for that user are notified. MyJMSBroadcaster creates and listens to a dynamic JMS queue (using ApacheMQ) messages can be put in queue bc.broadcast(message) which wraps jmsTemplate.convertAndSend() or (more likely) by calling JMS directly without reference to Atmosphere. ps, one more tweak to your demo code: doing addFilter(new XSSHtmlFilter()) continues to stack more filters, because the 'new' filter is not found in the collection; so define a singleton xssHtmlFilter. |
|
Salut,
On 10-12-06 7:57 PM, jackparker wrote: > > > jfarcand-3 wrote: >> >> Salut, >> >>> And if I do my own BroadcasterFactory, that would obviate the need for >>> that. >>> [that is: the BroadcasterFactory should inject/create the correct >>> Broadcaster *before* giving it to the AtmosphereHander] >> >> Currently I don't think you can do that unless I add some Filter concept. >> > Agreed. would need some pluggable strategy to replace the map(servletPath) Indeed. > > However, this is not not really needed. Now that I see what the > DefaultBroadcasterFactory does, > it took just a few hacks to my constructor/setID (to ignore the initial > "default" id) and now > (with thanks to Mike, the OP, for his 'one-liner') this works in > MyAtomspherHander.onRequest: > // resource.getBroadcaster() = map("/{servletPath}"); but we > want the per-user Broadcaster > String path = req.getServletPath(); > String bcid = path+"/"+session.getUsername(); // also becomes > the JMS Queue name > BroadcasterFactory bf = BroadcasterFactory.getDefault(); > Broadcaster bc = bf.lookup(MyJMSBroadcaster.class, bcid, true); > // get or create/register MyJMSBroadcaster(bcid); > bc.getBroadcasterConfig().addFilter(xssHtmlFilter); > > resource.setBroadcaster(bc); // add Broadcaster to resource > resource.suspend(); // add resource to (Broadcaster)bc! > > As Mike said, all sessions/requests/Resources with the same username > a) find the same Broadcaster, and B) are in the 'resources' list of that > same Broadcaster > So if any message comes to that Broadcaster, all sessions for that user are > notified. > MyJMSBroadcaster creates and listens to a dynamic JMS queue (using ApacheMQ) > messages can be put in queue bc.broadcast(message) which wraps > jmsTemplate.convertAndSend() > or (more likely) by calling JMS directly without reference to Atmosphere. Ok the above make sense. > > > ps, one more tweak to your demo code: > doing addFilter(new XSSHtmlFilter()) continues to stack more filters, > because the 'new' filter is not found in the collection; so define a > singleton xssHtmlFilter. Hum...sound like a regression as the idea is to not allow duplicate. Looking.... FYI I've applied the fix for the custom Broadcaster https://github.com/Atmosphere/atmosphere/commit/29ba639120937677d6121f226587dd893acc2879 Thanks! -- Jeanfrancois > > > > |
|
For the record, here is MyJMSBroadcaster (using Spring JmsTemplate, but that could be changed)
I restructrured a bit from the JMSBroadcaster... I also had to hack around the super constructor's setID(); package my.pkg.comet; import my.pkg.domain.Member; // Spring/roo generated class import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.atmosphere.util.AbstractBroadcasterProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; /** * Broadcaster is used by some [My]AtmosphereHandler? * AtmosphereHandler and Broadcaster comprise a AtmosphereHandlerWrapper. * which is registered in AtmosphereServlet.handlers() by the Broadcaster.getID() [mapBroadcasterToAtmosphereHandler()] * [ also mapped from (servlet-path!, ahw); maybe bc.getID() is a servlet-path?? ] */ public class MyJMSBroadcaster extends AbstractBroadcasterProxy { private static final String TEMP_NAME = "MyJMSBroadcaster.unnamed"; static private JmsTemplate staticJmsTemplate; static public QueueConnectionFactory getQueueConnectionFactory() { return (QueueConnectionFactory)staticJmsTemplate.getConnectionFactory(); } // roundabout way for Spring to inject: static staticJmsTemplate = jmsTemplate; public static class JmsTemplateBean { public JmsTemplateBean(JmsTemplate jmsTemplate) { staticJmsTemplate = jmsTemplate; } } // inner-class, with binding through pushIfAble() to broadcastRecievedMessage() public class JMSListener implements MessageListener { // sendMessage()/jmsConvertAndSend() is delivered here, // unwrap from JMS and push the message to the waiting client via comet response // message formatting/sending is done in MyAtmosphereHandler.onStateChange @Override public void onMessage(Message msg) { try { TextMessage textMessage = (TextMessage) msg; String message = textMessage.getText(); MyJMSBroadcaster.this.pushIfAble(message); } catch (JMSException ex) { log.warn("JMSListener failed to push message: "+msg, ex); } } } private final Logger log = LoggerFactory.getLogger(MyJMSBroadcaster.class); private QueueConnection qconnection; private QueueSession qsession; private Queue queue; private QueueReceiver qreceiver; private QueueSender qsender; private MessageListener messageListener = new JMSListener(); // interface between JMSListener and MyJMSBroadcaster: private void pushIfAble(Object message) { if (message != null && bc != null) { log.detail("pushIfAble: resources={}, message={}",resources, message); broadcastReceivedMessage(message); // push(new Entry(filter(newMsg), null, new BroadcasterFuture |
|
Salut,
thanks for sharing the code. I've first merged Christopher's proposal (and exposed AtmosphereConfig to Broadcaster so init param can be used): http://is.gd/irsnA will it cover your use case as well? If not can you tell me what's missing? Thanks! -- Jeanfrancois On 10-12-08 8:16 PM, jackparker wrote: > > For the record, here is MyJMSBroadcaster (using Spring JmsTemplate, but that > could be changed) > I restructrured a bit from the JMSBroadcaster... > I also had to hack around the super constructor's setID(); > > package my.pkg.comet; > > import my.pkg.domain.Member; // Spring/roo generated class > > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageListener; > import javax.jms.Queue; > import javax.jms.QueueConnection; > import javax.jms.QueueConnectionFactory; > import javax.jms.QueueReceiver; > import javax.jms.QueueSender; > import javax.jms.QueueSession; > import javax.jms.Session; > import javax.jms.TextMessage; > > import org.atmosphere.util.AbstractBroadcasterProxy; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import org.springframework.beans.factory.annotation.Autowired; > import org.springframework.jms.core.JmsTemplate; > > /** > * Broadcaster is used by some [My]AtmosphereHandler? > * AtmosphereHandler and Broadcaster comprise a AtmosphereHandlerWrapper. > * which is registered in AtmosphereServlet.handlers() by the > Broadcaster.getID() [mapBroadcasterToAtmosphereHandler()] > * [ also mapped from (servlet-path!, ahw); maybe bc.getID() is a > servlet-path?? ] > */ > public class MyJMSBroadcaster extends AbstractBroadcasterProxy { > private static final String TEMP_NAME = "MyJMSBroadcaster.unnamed"; > > static private JmsTemplate staticJmsTemplate; > static public QueueConnectionFactory getQueueConnectionFactory() { > return (QueueConnectionFactory)staticJmsTemplate.getConnectionFactory(); > } > > // roundabout way for Spring to inject: static staticJmsTemplate = > jmsTemplate; > public static class JmsTemplateBean { > public JmsTemplateBean(JmsTemplate jmsTemplate) { > staticJmsTemplate = jmsTemplate; > } > } > > // inner-class, with binding through pushIfAble() to > broadcastRecievedMessage() > public class JMSListener implements MessageListener { > // sendMessage()/jmsConvertAndSend() is delivered here, > // unwrap from JMS and push the message to the waiting client via comet > response > // message formatting/sending is done in MyAtmosphereHandler.onStateChange > @Override > public void onMessage(Message msg) { > try { > TextMessage textMessage = (TextMessage) msg; > String message = textMessage.getText(); > MyJMSBroadcaster.this.pushIfAble(message); > > } catch (JMSException ex) { > log.warn("JMSListener failed to push message: "+msg, ex); > } > } > } > > private final Logger log = > LoggerFactory.getLogger(MyJMSBroadcaster.class); > private QueueConnection qconnection; > private QueueSession qsession; > private Queue queue; > private QueueReceiver qreceiver; > private QueueSender qsender; > private MessageListener messageListener = new JMSListener(); > > // interface between JMSListener and MyJMSBroadcaster: > private void pushIfAble(Object message) { > if (message != null&& bc != null) { > log.detail("pushIfAble: resources={}, message={}",resources, message); > broadcastReceivedMessage(message); > // push(new Entry(filter(newMsg), null, new BroadcasterFuture(newMsg), > message)); > // > resources[].push(message)->onStateChange()->resource.write(format(message)) > } > } > > private void updateConnections() { > String queueId = getQueueName(); > if (queueId.equals(TEMP_NAME)) > return; // take no action until setID() with real/final name > try { > qconnection = getConnectionFactory().createQueueConnection(); > qsession = qconnection.createQueueSession(false, > Session.AUTO_ACKNOWLEDGE); > // create a non-portable "queue identity" > // or use jndiContext.lookup("dynamicQueues/"+getQueueName()) > // if (queue != null) queue.close(); > if (qsender != null) qsender.close(); > queue = qsession.createQueue(queueId); > qsender = qsession.createSender(queue); // mostly unused in My > setMessageSelector(msgSelector); > } catch (JMSException ex) { > log.warn("Could not update Connection/Queue", ex); > throw new IllegalStateException("Could not update Connection/Queue", ex); > } > } > > private QueueConnectionFactory connectionFactory; > public QueueConnectionFactory getConnectionFactory() { return > this.connectionFactory; } > @Autowired > public void setConnectionFactory(QueueConnectionFactory connectionFactory) > { > this.connectionFactory = connectionFactory; > updateConnections(); // sets qreceiver, et al. > start(); > } > > private String msgSelector = ""; // empty selector same as null > public String getMessageSelector() { return this.msgSelector; } > public void setMessageSelector(String msgSelector) { > this.msgSelector = msgSelector; // we could test for actual change of > msgSelector... > setListeners(); // update the qreceiver& setListeners: > } > > // Super-class [AbstractBroadcasterProxy] invokes start() in its > constructor, > // but that is before the connnetionFactory can be set. > // [constructor injection would not help, as super(id) must be invoked > before setting local fields] > // so we interpose here to avoid setting the one-shot 'started' > AtomicBoolean > // and re-invoke start() after/when connectionFactory is set; > protected void start() { > if (getConnectionFactory() == null) { > return; // try again later... > } > super.start(); // set started, create BroadcastHandler and > setListeners > } > > private String getQueueName() { > return getID(); // use id as queue name... > // assert: (getID() == TEMP_NAME ? getQueueName() == TEMP_NAME : true) > } > > // convenience method so we can keep queue name logic in one file. > // send message to member on the Servlet(req) > /** > * Send a Comet event to a client application. > *<p> > * If you already know the client's queue name, you can: > *<code>jmsTemplate.convertAndSend(queueName, message)</code> > * > * @param path the servletPath used by client comet request (identifies the > application or domain) > * @param member a myDS-authenticated Member, member.getUsername() is used. > * @param message the String to be sent to the client as a comet event > */ > public static void sendMessage(String path, Member member, String message) > { > String queueName = getQueueName(path, member); > staticJmsTemplate.convertAndSend(queueName, message); > } > > // create a receiver/listener for messages to this member on the > Servlet(req) > public static MyJMSBroadcaster newMyJMSBroadcaster(String path, Member > member) { > String queueName = getQueueName(path, member); > return new MyJMSBroadcaster(queueName); > } > > private static String getQueueName(String path, Member member) { > //String path = req.getServletPath(); > return path+"/"+member.getUsername(); > } > > @Override > public void setID(String id) { > super.setID(id); > updateConnections(); > } > > public MyJMSBroadcaster() { > this(TEMP_NAME); > } > > public MyJMSBroadcaster(String id) { > // we would rather that Spring could inject ConnectionFactory, but for now > we use self-service: > // move ConnectionFactory from lastConnectionFactoryHolder to this: > this(id, getQueueConnectionFactory()); > } > > // if we can ever create this Broadcaster from Spring, supply the > ConnectionFactory to be used: > /** > * @param id for broadcaster.getID() > * @param queueConnectionFactory typically a cachingConnectionFactory from > Spring. > */ > public MyJMSBroadcaster(String id, QueueConnectionFactory > queueConnectionFactory) { > super(id); // set name, setID, empty BroadcasterCache, new > BroadcasterConfig > setConnectionFactory(queueConnectionFactory); > } > > // might generalize this to listen to user's Queue and application's Topic > // Q: is it better to have A) 1 "ALL" topic + 1000s of per-user queues + > subtopic queues > // or B) 1 APP queue with selection filters for "TO=${me} || TO=${ALL} || > TO=${TOPIC-7}" > // if we can pre-filter when the target group is small (and so not send to > each machine) > // then plan B is simpler and more flexible. > // else the sender needs to expand the 'group-members-list' > // A) setSelector(String str...) { > createReceiver(getQueueByName(str)).addListener();... } > // B) setSelector(String str...) { msgSelector += "TO=$str || ..."; > getQ().createRec(mS).addListener(); } > // We like (A); just send the DestinationNames (queue://user, topic://all, > topic://game1) > private void setListeners() { > try { > QueueReceiver qr = qreceiver; // save until new qreceiver is configured > qreceiver = qsession.createReceiver(queue, msgSelector); > if (qr != null) > qr.close(); // kill old qreceiver; > > qreceiver.setMessageListener(messageListener); > log.detail("set MessageListener for queue {}", queue); > } catch (JMSException ex) { > log.warn("Could not update MessageListener for "+queue, ex); > throw new IllegalStateException("Could not create > MessageListener for "+queue, ex); > } > } > > // incomingBroadcast is the 'run()' method of the resulting > BroadcastHandler: > // invoked by ExecutorService when Broadcaster.start() { > // bc.getExecutorService().submit(getBroadcastHandler()); > // } > // start() is called in ABP.constructor(id) [before msgSelector is set!] > // rely that 'this.incomingBroadcast() is blocked in Executor until > constructor has completed?! > // note to self: maybe select on JMSTimestamp > // > // DefaultBroadcaster is way convoluted... > // each call to broadcast(msg) calls: {start(); messages.offer(new > Entry(newMsg,null,future,msg); return future;} > // start() has a one-shot that calls > executorService.submit(getBroadcastHandler()) > // getBroadcastHandler() [misnomer] returns a Runnable [which is > interesting for DefaultBroadcaster > // because it continues to re-submit itself to push(messages.take())] > // Hmm: the Entry msg holds a [Broadcaster]Future, and that future is > done/canceled. > // > // push(Entry) iterates multiple resources on the Broadcaster; delivering > to push(resource, finalMsg) > // push(r,msg) checks lifecycle, then calls: broadcast(r,e(r,msg)) > // broadcast(r,e) finds the AtmosphereHandler and calls > AtmosphereHandler.onStateChange(e); > // presumably each Resource to which one is broadcasting can have its own > Handler. > // onStateChange(e) does e.resource.write(toJson(message)) > > // but: once the JMS conection is started (which AbstractBroadcasterProxy > does in contstuctor) > // [we finesse all that, just doing staticJmsTemplate.convertAndSend(msg)] > // > // when a message is received from JMS we call: > broadcastReceivedMessage(message) > // broadcastReceivedMessage(message) calls push(new Entry(filter(message), > null, Future, message)); > // push(Entry) -> push(r,msg) -> > broadcast(r,e)->handler.onStateChange(e)->e.r.write(toJson(message)) > // BUT: AbstractBroadcasterProxy sets writeLocally=false, so nothing is > emitted! > > /** > * {@inheritDoc} > */ > @Override > public void incomingBroadcast() { > try { > qconnection.start(); > } catch (JMSException ex) { > log.warn("Unable to initialize BroadcastHandler", ex); > throw new IllegalStateException("Unable to initialize > BroadcastHandler", ex); > } > } > > // AbstractBroadcasterProxy.broadcast(...) comes here; the subsequent > push(Entry) has little effect > > // Forward message to this queue/topic/destination Listeners [ie: > this.messageListener] > // where it emerges on the set of resources assigned to this Broadcaster. > // The application uses JmsTemplate to send to queue/topic. > [MyJMSBroadcaster.sendMessage()] > // Note: the Chat/demo code may call bc.broadcast("suspended") or some such > /** > * {@inheritDoc} > */ > @Override > public void outgoingBroadcast(final Object message) { > try { > // Note: JmsTemplate creates a producer for each send! > finally{closeProducer()} > // staticJmsTemplate.send(queue, new MessageCreator() { > // Message createMessage(Session session) { > // return session.createTextMessage(message.toString()); > // } > // }); > qsender.send(qsession.createTextMessage(message.toString())); > //if (qos) producer.send(message, deliveryMode, priority, ttl); > } catch (JMSException ex) { > log.warn("Unable to send outgoingBroadcast", ex); > } > } > } > |
| Powered by Nabble | Edit this page |
