Quantcast

Finding my way - push to individual users

classic Classic list List threaded Threaded
34 messages Options
12
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Finding my way - push to individual users

mikeD
Hello,

i'm writing an application in which screens a little section shows some infos to the user that are pushed by the server (kind of events the user is interested in, like the status of some previous long running actions he undertook). The user might have multiple browsers open and I want to notify all those (thus basing my push upon the user login for exemple).

I found lot of docs and always the same sample with broadcasts, but what about this kind of use case ?

We plan to start with 4000 users and are working with Weblogic 10 servers. I know this post is very general, but I tried to find my way in all samples, docs, posts, and haven't found anything explicit for so far.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

jfarcand
Administrator
Salut,

Using Broadcaster#broadcast(events, AtmosphereResource)

    http://is.gd/gS5Av

is what you want to use, e.g you broadcast a message to a single user. You can also do Broadcaster#broadcast(events, Set<AtmosphereResource>)

     http://is.gd/gS5Df

to broadcast to a subset of user.

Does that help?

-- Jeanfrancois
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

mikeD
OK, I thought I'd have to use a resume instead of a broadcast since it was to continue a response (or 2 responses)...

But what I have more difficulties is to see how to use all this with the client side... I'd like to be able to use any comet protocol or websocket...

Do I have to use jQuery and Meteor ? While reading all this, I thought I'd have to use Meteor, have my own servlet that would suspend all incoming get's and that would resume (or broadcast based on your reply) the response to the user for who I receive an update from another request. Those requests come to the servlet from another server application, pushing updates for a precise logged in user.

I even thought one moment to use the pub/sub, based on a channel per login... but it seems maybe a little bit overkill...

Would you have any pointer / direction to send me ?

Thx in advance,

Mike

2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email]>
Salut,

Using Broadcaster#broadcast(events, AtmosphereResource)

    http://is.gd/gS5Av

is what you want to use, e.g you broadcast a message to a single user. You can also do Broadcaster#broadcast(events, Set<AtmosphereResource>)

     http://is.gd/gS5Df

to broadcast to a subset of user.

Does that help?

-- Jeanfrancois




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
Salut,

On 10-11-10 3:59 AM, mikeD wrote:
> OK, I thought I'd have to use a resume instead of a broadcast since it
> was to continue a response (or 2 responses)...

Can you elaborate a little bit? Broadcast will send an message to the
remote client. You can resume on broadcast (long polling) or leave the
connection open (http-streaming, websocket).


>
> But what I have more difficulties is to see how to use all this with the
> client side... I'd like to be able to use any comet protocol or websocket...

Take a look at the Atmosphere JQuery Plugin:

    http://is.gd/gTVYN

The sample jquery-* all support all transport.

   http://is.gd/frznC

>
> Do I have to use jQuery and Meteor ?

The same way described in the above blog. Take a look at the Wicket
sample, which use Meteor to support Comet and Websocket with JQuery Plugin

    http://is.gd/fOTK6

You can download the sample using the link above.


While reading all this, I thought
> I'd have to use Meteor, have my own servlet that would suspend all
> incoming get's and that would resume (or broadcast based on your reply)
> the response to the user for who I receive an update from another
> request.

OK if I understand properly, you don't want to broadcast events to all
users (browser), but instead you want to always broadcast to a single user:

> Meteor m = Meteor.build(req);
> // Set a Broadcaster per Meteor. Use the servlet-path for ID
> m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath());
>
> m.suspend();

Then when you are ready to broadcast event:

> m.broadcast("message").resume();


Those requests come to the servlet from another server
> application, pushing updates for a precise logged in user.
>
> I even thought one moment to use the pub/sub, based on a channel per
> login... but it seems maybe a little bit overkill...
>
> Would you have any pointer / direction to send me ?

Let me know if that help!

-- Jeanfrancois


>
> Thx in advance,
>
> Mike
>
> 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email]
> </user/SendEmail.jtp?type=node&node=5723935&i=0>>
>
>     Salut,
>
>     Using Broadcaster#broadcast(events, AtmosphereResource)
>
>     http://is.gd/gS5Av
>
>     is what you want to use, e.g you broadcast a message to a single
>     user. You can also do Broadcaster#broadcast(events,
>     Set<AtmosphereResource>)
>
>     http://is.gd/gS5Df
>
>     to broadcast to a subset of user.
>
>     Does that help?
>
>     -- Jeanfrancois
>
>     ------------------------------------------------------------------------
>     View message @
>     http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t>
>
>     To unsubscribe from Finding my way - push to individual users, click
>     here
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>.
>
>
>
>
> ------------------------------------------------------------------------
> View this message in context: Re: Finding my way - push to individual
> users
> <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html>
> Sent from the Atmosphere users mailling list mailing list archive
> <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at
> Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

mikeD
Hi,

I start to understand  the mechanism now, while reading all samples...

when you say that 
// Set a Broadcaster per Meteor. Use the servlet-path for ID
m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath());

Maybe i'm wrong but i think this will create an instance of Broadcaster per servlet path and then set it on the Meteor... ending in having a broadcaster instance for the servlet path, which is the same for every user... wouldn't it have to be created using the username as Id ? Or maybe you assume that the servlet path is unique per user, meaning that I would have something like /MessageHandler/jfarcand for you and /MessageHandler/mike for me.... is the latest what you assume ?

Mike

2010/11/10 jfarcand <[hidden email]>
Salut,


On 10-11-10 3:59 AM, mikeD wrote:
OK, I thought I'd have to use a resume instead of a broadcast since it
was to continue a response (or 2 responses)...

Can you elaborate a little bit? Broadcast will send an message to the remote client. You can resume on broadcast (long polling) or leave the connection open (http-streaming, websocket).




But what I have more difficulties is to see how to use all this with the
client side... I'd like to be able to use any comet protocol or websocket...

Take a look at the Atmosphere JQuery Plugin:

  http://is.gd/gTVYN

The sample jquery-* all support all transport.

 http://is.gd/frznC



Do I have to use jQuery and Meteor ?

The same way described in the above blog. Take a look at the Wicket sample, which use Meteor to support Comet and Websocket with JQuery Plugin

  http://is.gd/fOTK6

You can download the sample using the link above.



While reading all this, I thought
I'd have to use Meteor, have my own servlet that would suspend all
incoming get's and that would resume (or broadcast based on your reply)
the response to the user for who I receive an update from another
request.

OK if I understand properly, you don't want to broadcast events to all users (browser), but instead you want to always broadcast to a single user:

Meteor m = Meteor.build(req);
// Set a Broadcaster per Meteor. Use the servlet-path for ID
m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath());

m.suspend();

Then when you are ready to broadcast event:

m.broadcast("message").resume();


Those requests come to the servlet from another server
application, pushing updates for a precise logged in user.

I even thought one moment to use the pub/sub, based on a channel per
login... but it seems maybe a little bit overkill...

Would you have any pointer / direction to send me ?

Let me know if that help!

-- Jeanfrancois



Thx in advance,

Mike

2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email]
</user/SendEmail.jtp?type=node&node=5723935&i=0>>


   Salut,

   Using Broadcaster#broadcast(events, AtmosphereResource)

   http://is.gd/gS5Av

   is what you want to use, e.g you broadcast a message to a single
   user. You can also do Broadcaster#broadcast(events,
   Set<AtmosphereResource>)

   http://is.gd/gS5Df

   to broadcast to a subset of user.

   Does that help?

   -- Jeanfrancois

   ------------------------------------------------------------------------
   View message @
   http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html
   <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t>


   To unsubscribe from Finding my way - push to individual users, click
   here
   <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>.




------------------------------------------------------------------------
View this message in context: Re: Finding my way - push to individual
users
<http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html>

Sent from the Atmosphere users mailling list mailing list archive
<http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at
Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

mikeD
In reply to this post by Jeanfrancois Arcand-4
Hello,

goign further with code... I try to have a webpage in which the logged in user will receive messages sent by the system (various applications). Only the targeted user should receive the messages, they are personnal.

I wrote a web page (html), setup a webapp with 2 servlets :
- Meteor managing my first servlet (ContextPublisher)
- my second servlet (BroadcastServlet).

The first one is used by the page to get the messages
The second one is called by other apps (myself with another browser for the moment) to send messages (Strings) to a given user.

I see in the server logs that when I open the webpage in a browser, it does a GET on the first servlet and is suspended.
I see in the logs that when I call the second servlet, a broadcast will be issued to the right user-related Broadcasters

I don't see the onBroadcast fired.
The webpage never receive the message.

Based on this, I assume this is not the right way to broadcast, but what is wrong ?

In attachment, the code for the page and the 2 servlets.

Thanks in advance for your help !


2010/11/10 jfarcand-3 [via Atmosphere users mailling list] <[hidden email]>
Salut,

On 10-11-10 3:59 AM, mikeD wrote:
> OK, I thought I'd have to use a resume instead of a broadcast since it
> was to continue a response (or 2 responses)...

Can you elaborate a little bit? Broadcast will send an message to the
remote client. You can resume on broadcast (long polling) or leave the
connection open (http-streaming, websocket).


>
> But what I have more difficulties is to see how to use all this with the
> client side... I'd like to be able to use any comet protocol or websocket...

Take a look at the Atmosphere JQuery Plugin:

    http://is.gd/gTVYN

The sample jquery-* all support all transport.

   http://is.gd/frznC

>
> Do I have to use jQuery and Meteor ?

The same way described in the above blog. Take a look at the Wicket
sample, which use Meteor to support Comet and Websocket with JQuery Plugin

    http://is.gd/fOTK6

You can download the sample using the link above.


While reading all this, I thought
> I'd have to use Meteor, have my own servlet that would suspend all
> incoming get's and that would resume (or broadcast based on your reply)
> the response to the user for who I receive an update from another
> request.

OK if I understand properly, you don't want to broadcast events to all
users (browser), but instead you want to always broadcast to a single user:

> Meteor m = Meteor.build(req);
> // Set a Broadcaster per Meteor. Use the servlet-path for ID
> m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class, req.getServletPath());
>
> m.suspend();

Then when you are ready to broadcast event:

> m.broadcast("message").resume();


Those requests come to the servlet from another server
> application, pushing updates for a precise logged in user.
>
> I even thought one moment to use the pub/sub, based on a channel per
> login... but it seems maybe a little bit overkill...
>
> Would you have any pointer / direction to send me ?

Let me know if that help!

-- Jeanfrancois


>
> Thx in advance,
>
> Mike
>
> 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden email]
> </user/SendEmail.jtp?type=node&node=5723935&i=0>>
>
>     Salut,

>
>     Using Broadcaster#broadcast(events, AtmosphereResource)
>
>     http://is.gd/gS5Av
>
>     is what you want to use, e.g you broadcast a message to a single
>     user. You can also do Broadcaster#broadcast(events,
>     Set<AtmosphereResource>)
>
>     http://is.gd/gS5Df
>
>     to broadcast to a subset of user.
>
>     Does that help?
>
>     -- Jeanfrancois
>
>     ------------------------------------------------------------------------
>     View message @
>     http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t>
>
>     To unsubscribe from Finding my way - push to individual users, click
>     here
> View this message in context: Re: Finding my way - push to individual
> users
> <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at
> Nabble.com.






context-1.html (2K) Download Attachment
BroadcastServlet.java (1K) Download Attachment
ContextPublisher.java (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
In reply to this post by mikeD
Salut,

On 10-11-16 2:50 AM, Michael Dewitte wrote:

> Hi,
>
> I start to understand  the mechanism now, while reading all samples...
>
> when you say that
> // Set a Broadcaster per Meteor. Use the servlet-path for ID
> m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class,
> req.getServletPath());
>
> Maybe i'm wrong but i think this will create an instance of Broadcaster
> per servlet path and then set it on the Meteor... ending in having a
> broadcaster instance for the servlet path, which is the same for every
> user...

Yes, for the clock sample I wanted to use a single Broadcaster/


  wouldn't it have to be created using the username as Id ?

You could, but that means only that user will received events when you
invoke Broadcaster.broadcast. No events will be shared.


Or
> maybe you assume that the servlet path is unique per user, meaning that
> I would have something like /MessageHandler/jfarcand for you and
> /MessageHandler/mike for me.... is the latest what you assume ?

It really an application decision. If you write a chat, you will create
a Meteor per chat room. In the sample I didn't needed that.

Thanks!

-- jeanfrancois



>
> Mike
>
> 2010/11/10 jfarcand <[hidden email] <mailto:[hidden email]>>
>
>     Salut,
>
>
>     On 10-11-10 3:59 AM, mikeD wrote:
>
>         OK, I thought I'd have to use a resume instead of a broadcast
>         since it
>         was to continue a response (or 2 responses)...
>
>
>     Can you elaborate a little bit? Broadcast will send an message to
>     the remote client. You can resume on broadcast (long polling) or
>     leave the connection open (http-streaming, websocket).
>
>
>
>
>         But what I have more difficulties is to see how to use all this
>         with the
>         client side... I'd like to be able to use any comet protocol or
>         websocket...
>
>
>     Take a look at the Atmosphere JQuery Plugin:
>
>     http://is.gd/gTVYN
>
>     The sample jquery-* all support all transport.
>
>     http://is.gd/frznC
>
>
>
>         Do I have to use jQuery and Meteor ?
>
>
>     The same way described in the above blog. Take a look at the Wicket
>     sample, which use Meteor to support Comet and Websocket with JQuery
>     Plugin
>
>     http://is.gd/fOTK6
>
>     You can download the sample using the link above.
>
>
>
>     While reading all this, I thought
>
>         I'd have to use Meteor, have my own servlet that would suspend all
>         incoming get's and that would resume (or broadcast based on your
>         reply)
>         the response to the user for who I receive an update from another
>         request.
>
>
>     OK if I understand properly, you don't want to broadcast events to
>     all users (browser), but instead you want to always broadcast to a
>     single user:
>
>         Meteor m = Meteor.build(req);
>         // Set a Broadcaster per Meteor. Use the servlet-path for ID
>         m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class,
>         req.getServletPath());
>
>         m.suspend();
>
>
>     Then when you are ready to broadcast event:
>
>         m.broadcast("message").resume();
>
>
>
>     Those requests come to the servlet from another server
>
>         application, pushing updates for a precise logged in user.
>
>         I even thought one moment to use the pub/sub, based on a channel per
>         login... but it seems maybe a little bit overkill...
>
>         Would you have any pointer / direction to send me ?
>
>
>     Let me know if that help!
>
>     -- Jeanfrancois
>
>
>
>         Thx in advance,
>
>         Mike
>
>         2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden
>         email]
>         </user/SendEmail.jtp?type=node&node=5723935&i=0>>
>
>
>             Salut,
>
>             Using Broadcaster#broadcast(events, AtmosphereResource)
>
>         http://is.gd/gS5Av
>
>             is what you want to use, e.g you broadcast a message to a single
>             user. You can also do Broadcaster#broadcast(events,
>             Set<AtmosphereResource>)
>
>         http://is.gd/gS5Df
>
>             to broadcast to a subset of user.
>
>             Does that help?
>
>             -- Jeanfrancois
>
>
>           ------------------------------------------------------------------------
>             View message @
>         http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html
>         <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t>
>
>
>             To unsubscribe from Finding my way - push to individual
>         users, click
>             here
>         <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t
>         <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>>.
>
>
>
>
>         ------------------------------------------------------------------------
>         View this message in context: Re: Finding my way - push to
>         individual
>         users
>         <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html>
>
>         Sent from the Atmosphere users mailling list mailing list archive
>         <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at
>         Nabble.com.
>
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
In reply to this post by mikeD
Salut,

On 10-11-16 8:38 AM, mikeD wrote:

> Hello,
>
> goign further with code... I try to have a webpage in which the logged
> in user will receive messages sent by the system (various applications).
> Only the targeted user should receive the messages, they are personnal.
>
> I wrote a web page (html), setup a webapp with 2 servlets :
> - Meteor managing my first servlet (ContextPublisher)
> - my second servlet (BroadcastServlet).
>
> The first one is used by the page to get the messages
> The second one is called by other apps (myself with another browser for
> the moment) to send messages (Strings) to a given user.
>
> I see in the server logs that when I open the webpage in a browser, it
> does a GET on the first servlet and is suspended.
> I see in the logs that when I call the second servlet, a broadcast will
> be issued to the right user-related Broadcasters
>
> I don't see the onBroadcast fired.
> The webpage never receive the message.

I suspect the broiadcaster used in second Servlet is not the proper
one.Can you send me your code / war at jfarcand [at] apache [dot] org?

>
> Based on this, I assume this is not the right way to broadcast, but what
> is wrong ?

Right.

>
> In attachment, the code for the page and the 2 servlets.
>
> Thanks in advance for your help !

You are welcome.

A+

--Jeanfrancois


>
>
> 2010/11/10 jfarcand-3 [via Atmosphere users mailling list] <[hidden
> email] </user/SendEmail.jtp?type=node&node=5743825&i=0>>
>
>     Salut,
>
>     On 10-11-10 3:59 AM, mikeD wrote:
>      > OK, I thought I'd have to use a resume instead of a broadcast
>     since it
>      > was to continue a response (or 2 responses)...
>
>     Can you elaborate a little bit? Broadcast will send an message to the
>     remote client. You can resume on broadcast (long polling) or leave the
>     connection open (http-streaming, websocket).
>
>
>      >
>      > But what I have more difficulties is to see how to use all this
>     with the
>      > client side... I'd like to be able to use any comet protocol or
>     websocket...
>
>     Take a look at the Atmosphere JQuery Plugin:
>
>     http://is.gd/gTVYN
>
>     The sample jquery-* all support all transport.
>
>     http://is.gd/frznC
>
>      >
>      > Do I have to use jQuery and Meteor ?
>
>     The same way described in the above blog. Take a look at the Wicket
>     sample, which use Meteor to support Comet and Websocket with JQuery
>     Plugin
>
>     http://is.gd/fOTK6
>
>     You can download the sample using the link above.
>
>
>     While reading all this, I thought
>      > I'd have to use Meteor, have my own servlet that would suspend all
>      > incoming get's and that would resume (or broadcast based on your
>     reply)
>      > the response to the user for who I receive an update from another
>      > request.
>
>     OK if I understand properly, you don't want to broadcast events to all
>     users (browser), but instead you want to always broadcast to a
>     single user:
>
>      > Meteor m = Meteor.build(req);
>      > // Set a Broadcaster per Meteor. Use the servlet-path for ID
>      >
>     m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class,
>     req.getServletPath());
>      >
>      > m.suspend();
>
>     Then when you are ready to broadcast event:
>
>      > m.broadcast("message").resume();
>
>
>     Those requests come to the servlet from another server
>      > application, pushing updates for a precise logged in user.
>      >
>      > I even thought one moment to use the pub/sub, based on a channel per
>      > login... but it seems maybe a little bit overkill...
>      >
>      > Would you have any pointer / direction to send me ?
>
>     Let me know if that help!
>
>     -- Jeanfrancois
>
>
>      >
>      > Thx in advance,
>      >
>      > Mike
>      >
>      > 2010/11/9 jfarcand [via Atmosphere users mailling list] <[hidden
>     email]
>      > </user/SendEmail.jtp?type=node&node=5723935&i=0>>
>      >
>      >     Salut,
>
>      >
>      >     Using Broadcaster#broadcast(events, AtmosphereResource)
>      >
>      > http://is.gd/gS5Av
>      >
>      >     is what you want to use, e.g you broadcast a message to a single
>      >     user. You can also do Broadcaster#broadcast(events,
>      >     Set<AtmosphereResource>)
>      >
>      > http://is.gd/gS5Df
>      >
>      >     to broadcast to a subset of user.
>      >
>      >     Does that help?
>      >
>      >     -- Jeanfrancois
>      >
>      >
>     ------------------------------------------------------------------------
>
>      >     View message @
>      >
>     http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t&by-user=t>
>      >
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5721537.html?by-user=t&by-user=t&by-user=t>>
>
>      >
>      >     To unsubscribe from Finding my way - push to individual
>     users, click
>      >     here
>      >
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t&by-user=t&by-user=t>>.
>
>      >
>      >
>      >
>      >
>      >
>     ------------------------------------------------------------------------
>
>      > View this message in context: Re: Finding my way - push to
>     individual
>      > users
>      >
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5723935.html?by-user=t&by-user=t>>
>
>      > Sent from the Atmosphere users mailling list mailing list archive
>      > <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/?by-user=t&by-user=t>>
>     at
>      > Nabble.com.
>
>
>     ------------------------------------------------------------------------
>     View message @
>     http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5724753.html
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5724753.html?by-user=t>
>
>     To unsubscribe from Finding my way - push to individual users, click
>     here
>     <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/template/TplServlet.jtp?tpl=unsubscribe_by_code&node=5721315&code=bWljaGFlbC5kZXdpdHRlQGdtYWlsLmNvbXw1NzIxMzE1fC0xMDg0MzQyNjg4&by-user=t>.
>
>
>
>
>   Message
>
>
>     No message yet
>
>
> [BroadcastServlet.java]
> ------------------------------------------------------------------------
> package be.forem.context.controller;
>
> import java.io.IOException;
> import java.util.Enumeration;
> import java.util.concurrent.Future;
>
> import javax.servlet.ServletException;
> import javax.servlet.http.HttpServlet;
> import javax.servlet.http.HttpServletRequest;
> import javax.servlet.http.HttpServletResponse;
>
> import org.atmosphere.cpr.BroadcasterFactory;
> import org.atmosphere.cpr.DefaultBroadcaster;
>
>
>
> public class BroadcastServlet extends HttpServlet {
> private static final long serialVersionUID = -298281051064936872L;
>
> /**
> * @see HttpServlet#doPut(HttpServletRequest, HttpServletResponse)
> */
> protected void doGet(HttpServletRequest request, HttpServletResponse
> response) throws ServletException, IOException {
> String username = request.getParameter("username");
> String message = request.getParameter("message");
> String broadcasterId = "/ContextPublisher/"+username;
> System.out.println("Broadcasting ("+message+") to : "+broadcasterId);
>
> try {
> BroadcasterFactory.getDefault().get(DefaultBroadcaster.class,
> broadcasterId).broadcast("<script
> type='text/javascript'>parent.setMsg('" + message + "')</script>\n");
> } catch (IllegalAccessException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> } catch (InstantiationException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
>
> }
>
> }
>
> ------------------------------------------------------------------------
>
> [ContextPublisher.java]
> ------------------------------------------------------------------------
> package be.forem.context.controller;
>
> import java.io.IOException;
> import java.util.Enumeration;
>
> import javax.servlet.ServletException;
> import javax.servlet.http.HttpServlet;
> import javax.servlet.http.HttpServletRequest;
> import javax.servlet.http.HttpServletResponse;
>
> import org.atmosphere.cpr.AtmosphereResourceEvent;
> import org.atmosphere.cpr.AtmosphereResourceEventListener;
> import org.atmosphere.cpr.BroadcasterFactory;
> import org.atmosphere.cpr.DefaultBroadcaster;
> import org.atmosphere.cpr.Meteor;
>
>
> public class ContextPublisher extends HttpServlet implements
> AtmosphereResourceEventListener{
>
> private static final long serialVersionUID = 1393519886718770628L;
>
> protected void doGet(HttpServletRequest request, HttpServletResponse
> response) throws ServletException, IOException {
> Meteor m = Meteor.build(request);
>
> try {
> //create an id for the broadcaster, based on the username
> String broadcasterId=request.getServletPath()+request.getPathInfo();
> System.out.println("Suspending response for broadcaster : "
> +broadcasterId);
>
> m.setBroadcaster(BroadcasterFactory.getDefault().get(DefaultBroadcaster.class,
> broadcasterId));
> m.addListener(this);
> request.getSession().setAttribute("meteor", m);
> m.suspend(-1);
> } catch (IllegalAccessException e) {
> e.printStackTrace();
> } catch (InstantiationException e) {
> e.printStackTrace();
> }
>
>
> }
>
> public void onBroadcast(AtmosphereResourceEvent
> <HttpServletRequest, HttpServletResponse> event) {
> System.out.println("onBroadcast");
>
> Meteor meteor = Meteor.lookup(event.getResource().getRequest());
>
> meteor.removeListener(this);
> meteor.resume();
>
> }
>
> public void onDisconnect(
> AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) {
> // TODO Auto-generated method stub
> System.out.println("onDisconnect");
>
> }
>
> public void onResume(
> AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) {
> // TODO Auto-generated method stub
> System.out.println("onResume");
>
> }
>
> public void onSuspend(
> AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> arg0) {
> // TODO Auto-generated method stub
> System.out.println("onSuspend");
>
> }
>
> }
>
> ------------------------------------------------------------------------
>
> ------------------------------------------------------------------------
> View this message in context: Re: Finding my way - push to individual
> users
> <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/Finding-my-way-push-to-individual-users-tp5721315p5743825.html>
> Sent from the Atmosphere users mailling list mailing list archive
> <http://atmosphere-users-mailling-list.2493822.n2.nabble.com/> at
> Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

mikeD
For those who are also interested in this functionnality (I received a mail asking if I succeeded because someone was also trying to have kind of similar functionnality), I finally changed the strategy and ended with something working :

Finally, I ended with a working version as a POC. I based my work on the pubsub with jersey sample. I had to correct the fact it was with servlet 3. I had to correct the servlet mapping to be able to serve the html page. Then it worked.

At this point, I added another servlet whose purpose was only to receive a get from any other browser, with as parameter the desired topic to publish on and a message. In this servlet, there's only one line of code :

BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, request.getParameter("username")).broadcast(request.getParameter("message"));

And that's it ! This shows I'm able to send a broadcast to every browser a user has open with some iframe subscribing to the topic (equals his username) based on the code of the sample, just by using a call to the broadcastservlet I wrote or even just using the line of code above... which allows even to use jms to queue messages, or even start from a pojo to send the broadcast...

I don't know, though, how it will work in a cluster... i'll have to check in a near future...

Hope it helps a bit...
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
Salut,

Thanks for sharing...right now I'm swamped at work and I miss time to
give better help here.

On 10-11-30 7:11 AM, mikeD wrote:

>
> For those who are also interested in this functionnality (I received a mail
> asking if I succeeded because someone was also trying to have kind of
> similar functionnality), I finally changed the strategy and ended with
> something working :
>
> Finally, I ended with a working version as a POC. I based my work on the
> pubsub with jersey sample. I had to correct the fact it was with servlet 3.
> I had to correct the servlet mapping to be able to serve the html page. Then
> it worked.
>
> At this point, I added another servlet whose purpose was only to receive a
> get from any other browser, with as parameter the desired topic to publish
> on and a message. In this servlet, there's only one line of code :
>
> BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class,
> request.getParameter("username")).broadcast(request.getParameter("message"));
>
> And that's it ! This shows I'm able to send a broadcast to every browser a
> user has open with some iframe subscribing to the topic (equals his
> username) based on the code of the sample, just by using a call to the
> broadcastservlet I wrote or even just using the line of code above... which
> allows even to use jms to queue messages, or even start from a pojo to send
> the broadcast...
>
> I don't know, though, how it will work in a cluster... i'll have to check in
> a near future...

Take a look at:

   * http://is.gd/hZsTM
   * http://is.gd/hZsXn

You can easily cluster using Redis, JGroups, JMS or XMPP by either
choosing the proper BroadcastFilter or Broadcaster.

A+

- Jeanfrancois



>
> Hope it helps a bit...
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

jackparker
In reply to this post by mikeD
Oh, I see... you are using Atmosphere 'Broadcaster' to achieve messaging in a single JVM/Classloader scope.

So Atmosphere is really doing two jobs: a) a CPR to map Comet to various web servers,
and B) replacing a JMS or other messaging system.

I assume (B) is a pragmatic development because the first/obvious demo of Comet is for a Chat app.
But one could alternatively use JMS?
In that case, each web-user is associated with a 'Broadcaster' that is listening to a JMS topic or queue.
(and in this case of 'push to individual users', the topic/queue is named by the username)

Then, when a message is to be delivered to the user, one uses the username to lookup the JMS Destination
and send to that topic/queue, which triggers the Listener, which delivers the message to Atmosphere
(using AbstractBroadcastProxy.broadcastReceivedMessage(message) as in the JMSBroadcaster)

The advantage here is that the *sender* of the message does not need to know about Atmosphere,
and if you must span multiple servers/JVMs, you use the usual JMS/JNDI mechanisms.
Only the endpoint to the web-user needs to be linked to Atmosphere.

Is this a reasonable approach?


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

jackparker
Got it working with individual "Broadcaster" per user/session.

A few mods required:
0. minor: pingForStats() does not work when build/run from Eclipse, ${version} is not expanded.

1. AtmosphereServlet did not honor setting broadcasterClass in web.xml
   Fix: move the invocation of configureDefaultBroadcasterFactory() from the constructor
   and put it in init() *after* doInitParams(scFacade);
   [else the DefaultBroadcasterFactory is build using DefaultBroadcaster versus the given broadcasterClass]

3. There was a problem with [Chat]AtmosphereHandler writing to the response/resource.
   in AtmosphereResourceImpl.write(), getOutputStream() may be used,
   but later, AtmosphereHandler just uses getWriter() and thereupon fails.
   So, I added AtmosphereResource.write(String message) throws IOException,
   and implemented that with the existing AtmosphereResource.write()
   by adding the parameter, moving the try/catch block up to:
      if (flushComment) {
      try {
          write(beginCompatibleData);
      } catch (IOException ex) { loggger... }
      }
   And then replace res.getWriter().write(...)/flush() with resource.write(script);
   Now getWrite()/getOutputStream() are used consistently.
   

4. AtmosphereHandler.onRequest(resource) is invoked with the singleton 'resource' associated
   with the servlet's name; while that may be useful as a pan-application 'topic', we need more.
   The essential trick is to create and register a Broadcaster for each username/session.
   
   The new Broadcaster must be associated with the AtmosphereResourceImpl.
   The obvious way to do that was copy the code from AsynchronousProcessor:
        AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, broadcaster, req, res, this);
        req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re);

   That requires access to the appropriate CometSupport,
   so I added getCommetSupport() to the AtmosphereResource interface (and AtmosphereResourceImpl).
   This allows my AtmosphereHandler.onRequest(resource) to:
    cometSupport = resource.getCometSupport();
        config = resource.getAtmosphereConfig();
        AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, bc, req, res, cometSupport);
        req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re);
   Depending on the application requirements, one may cache that Broadcaster as necessary.
   [for me, I put in in the Session; if you have multiple session's per user,
    you may need to put it elsewhere, and attach each session/request to that Broadcaster]

5. The actual Broadcaster that I use creates a JMSMessageListener, the onMessage() of which
   extracts the message, and does broadcastReceiveMessage(message);
   The web application uses JMS to send messages to the queue/topic.
   Each user is listening to a dynamic queue named by the username (same as the Broadcaster name/id)
   and may listen to other topics as well. (TBD)

   This approach simplifies the usage of Atmosphere, as Atmosphere is now used only
   to supply the CometSupport to various protocols and app servers.
   And horizontal messaging is handled by JMS.
   Service 'RPC' requests from Http are managed by normal Spring @Controller dispatch,
   The AtmosphereServlet is just there to do async events to the client.
   Using Spring jmsTemplate, we just extract the destination from the REST url,
   (validate and secure, and find the senders username)
   and then jmsTemplate.convertAndSend(message);
   Presto: it comes out the onMessage() and is pushed to the client(s)!
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
In reply to this post by jackparker
Salut,

On 10-12-03 4:52 PM, jackparker wrote:
>
> Oh, I see... you are using Atmosphere 'Broadcaster' to achieve messaging in a
> single JVM/Classloader scope.
>
> So Atmosphere is really doing two jobs: a) a CPR to map Comet to various web
> servers,
> and B) replacing a JMS or other messaging system.

Not exactly. Atmosphere has a Broadcaster concept that can be
implemented as a JMS queue/topic.

>
> I assume (B) is a pragmatic development because the first/obvious demo of
> Comet is for a Chat app.
> But one could alternatively use JMS?
> In that case, each web-user is associated with a 'Broadcaster' that is
> listening to a JMS topic or queue.
> (and in this case of 'push to individual users', the topic/queue is named by
> the username)
>
> Then, when a message is to be delivered to the user, one uses the username
> to lookup the JMS Destination
> and send to that topic/queue, which triggers the Listener, which delivers
> the message to Atmosphere
> (using AbstractBroadcastProxy.broadcastReceivedMessage(message) as in the
> JMSBroadcaster)
>
> The advantage here is that the *sender* of the message does not need to know
> about Atmosphere,
> and if you must span multiple servers/JVMs, you use the usual JMS/JNDI
> mechanisms.
> Only the endpoint to the web-user needs to be linked to Atmosphere.
>
> Is this a reasonable approach?

Yes it is. Currently we do support:

  + JMS
  + Cluster
  + Redis
  + XMPP

(those will ship in 0.7). They have respective Broadcaster
implementation that can be used to exactly achieve what you described above.

A+

- Jeanfrancois






>
>
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
In reply to this post by jackparker
Salut,

first, many thanks for the feedback.

On 10-12-06 1:38 AM, jackparker wrote:
>
> Got it working with individual "Broadcaster" per user/session.
>
> A few mods required:
> 0. minor: pingForStats() does not work when build/run from Eclipse,
> ${version} is not expanded.

Hum...really strange. I don't use Eclipse. Can you paste the log.

>
> 1. AtmosphereServlet did not honor setting broadcasterClass in web.xml
>     Fix: move the invocation of configureDefaultBroadcasterFactory() from the
> constructor
>     and put it in init() *after* doInitParams(scFacade);
>     [else the DefaultBroadcasterFactory is build using DefaultBroadcaster
> versus the given broadcasterClass]

Let me check that. This is a regression for sure.


>
> 3. There was a problem with [Chat]AtmosphereHandler writing to the
> response/resource.
>     in AtmosphereResourceImpl.write(), getOutputStream() may be used,
>     but later, AtmosphereHandler just uses getWriter() and thereupon fails.
>     So, I added AtmosphereResource.write(String message) throws IOException,
>     and implemented that with the existing AtmosphereResource.write()
>     by adding the parameter, moving the try/catch block up to:
>        if (flushComment) {
>        try {
>            write(beginCompatibleData);
>        } catch (IOException ex) { loggger... }
>        }
>     And then replace res.getWriter().write(...)/flush() with
> resource.write(script);
>     Now getWrite()/getOutputStream() are used consistently.

Can you paste the diff here? I'm not aware of any problems like that.
There is also a property called:

    org.atmosphere.useStream

that can be set to force application to only use stream or writer.


>
>
> 4. AtmosphereHandler.onRequest(resource) is invoked with the singleton
> 'resource' associated
>     with the servlet's name; while that may be useful as a pan-application
> 'topic', we need more.
>     The essential trick is to create and register a Broadcaster for each
> username/session.
>
>     The new Broadcaster must be associated with the AtmosphereResourceImpl.
>     The obvious way to do that was copy the code from AsynchronousProcessor:
>          AtmosphereResourceImpl re = new AtmosphereResourceImpl(config,
> broadcaster, req, res, this);
>          req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re);
>
>     That requires access to the appropriate CometSupport,
>     so I added getCommetSupport() to the AtmosphereResource interface (and
> AtmosphereResourceImpl).
>     This allows my AtmosphereHandler.onRequest(resource) to:
>     cometSupport = resource.getCometSupport();
> config = resource.getAtmosphereConfig();
> AtmosphereResourceImpl re = new AtmosphereResourceImpl(config, bc, req,
> res, cometSupport);
> req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, re);

Why not doing AtmosphereResourceImpl.setBroadcaster(...) directly
instead of creating at AtmosphereResourceImpl?


>     Depending on the application requirements, one may cache that Broadcaster
> as necessary.
>     [for me, I put in in the Session; if you have multiple session's per
> user,
>      you may need to put it elsewhere, and attach each session/request to
> that Broadcaster]

Did you see the BroadcasterFactory? That API can also help when it is
time to cache Broadcaster:

    http://is.gd/ihYb4

Again, share you diff :-) I'm not against the change, but I need to see
it to make sure there is no regression with previous version.


>
> 5. The actual Broadcaster that I use creates a JMSMessageListener, the
> onMessage() of which
>     extracts the message, and does broadcastReceiveMessage(message);
>     The web application uses JMS to send messages to the queue/topic.
>     Each user is listening to a dynamic queue named by the username (same as
> the Broadcaster name/id)
>     and may listen to other topics as well. (TBD)
>
>     This approach simplifies the usage of Atmosphere, as Atmosphere is now
> used only
>     to supply the CometSupport to various protocols and app servers.
>     And horizontal messaging is handled by JMS.
>     Service 'RPC' requests from Http are managed by normal Spring @Controller
> dispatch,
>     The AtmosphereServlet is just there to do async events to the client.
>     Using Spring jmsTemplate, we just extract the destination from the REST
> url,
>     (validate and secure, and find the senders username)
>     and then jmsTemplate.convertAndSend(message);
>     Presto: it comes out the onMessage() and is pushed to the client(s)!

This is great! Again, if you can share your changes so I can see if I
can integrate them.

Thanks!

-- Jeanfrancois




>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

jackparker
Jeanfrancois,
   Thanks for the informative response.
Indeed, using setBroadcaster() on the existing Resource works!
And if I do my own BroadcasterFactory, that would obviate the need for that.
[that is: the BroadcasterFactory should inject/create the correct Broadcaster *before* giving it to the AtmosphereHander]

0: I have excluded atmosphere-ping so the problem is gone, but the essence was that because
an illegal character in the URL at Line-66:
URI.create(String.format("<a href="http://jfarcand.wordpress.com/ping-atmosphere-%s">http://jfarcand.wordpress.com/ping-atmosphere-%s", version)).toURL()
because version was something like "${pom.version}"
That is: the property file had not been filter/interpolated.
This was only a problem because Eclipse running tomcat in debug mode wants to 'debug' the Exception that is thrown, and so would block.
I was tempted to catch and handle better but just excluded atmosphere-ping.jar

The problem with getWrite()/getOutputStream() is in the old version of ChatAtmosphereHandler,
where it is hard-coded to use res.getWriter().write(...)/flush();
The only code I could find that checks PROPERTY_USE_STREAM was the write() method in AtmosphereResourceImpl
So I hacked that to make it possible for the "Application" to use the same code to output the real data,
as is used by the framework to send the flushComment/beginCompatibleData.

:atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
index 6a9abd7..5200a01
--- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
+++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
@@ -174,6 +174,15 @@ public interface AtmosphereResource<E, F> {
     public void write(OutputStream os, Object o) throws IOException;
 
     /**
+     * Write the {@link String} using the {@link OutputStream} or {@link ServletWriter}
+     * depending on the setting of org.atmosphere.useStream from web.xml init-params.
+     *
+     * @param str  {@link String}
+     * @throws java.io.IOException
+     */
+    public void write(String str) throws IOException;
+
+    /**
      * Get the {@link Serializer} or null if not defined.
      *
      * @return the {@link Serializer} or null if not defined.

:atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
index 5e97729..b6fd700
--- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
+++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
@@ -201,7 +201,11 @@ public class AtmosphereResourceImpl implements
             }
 
             if (flushComment) {
-                write();
+                try {
+                    write(beginCompatibleData);
+                } catch (Throwable ex) {
+                    LoggerUtils.getLogger().log(Level.WARNING, "", ex);
+                }
             }
             req.setAttribute(PRE_SUSPEND, "true");
             action.type = AtmosphereServlet.Action.TYPE.SUSPEND;
@@ -224,8 +228,13 @@ public class AtmosphereResourceImpl implements
         }
     }
 
-    void write() {
-        try {
+    /**
+     * write to this Response, using getWriter() or getOutputStream as indicated by useWriter
+     * or property PROPERTY_USE_STREAM=org.atmosphere.useStream;
+     */
+    @Override
+    public void write(String message) throws IOException {
+//         try {
             if (useWriter && !((Boolean) req.getAttribute(AtmosphereServlet.PROPERTY_USE_STREAM))) {
                 try {
                     res.getWriter();
@@ -233,7 +242,7 @@ public class AtmosphereResourceImpl implements
                     return;
                 }
 
-                res.getWriter().write(beginCompatibleData);
+                res.getWriter().write(message);
                 res.getWriter().flush();
             } else {
                 try {
@@ -246,9 +255,9 @@ public class AtmosphereResourceImpl implements
                 res.getOutputStream().flush();
             }
 
-        } catch (Throwable ex) {
-            LoggerUtils.getLogger().log(Level.WARNING, "", ex);
-        }
+//         } catch (Throwable ex) {
+//             LoggerUtils.getLogger().log(Level.WARNING, "", ex);
+//         }
     }
 
 
-----------------------------
Here is the diff for configureDefaultBroadcasterFactory for 0.7-SNAPSHOT of ~ 24/Nov/2010.
[blobs 1 & 3 are the actual patch, blobs 2 & 4 are just to make stepping in the debugger easier.]

:atmosphere>git diff modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
index 80ee851..0c2989e
--- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
+++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
@@ -396,7 +396,6 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
     public AtmosphereServlet(boolean isFilter) {
         this.isFilter = isFilter;
         readSystemProperties();
-        configureDefaultBroadcasterFactory();
         populateBroadcasterType();
     }
 
@@ -412,7 +411,9 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
         } catch (ClassNotFoundException e) {
             LoggerUtils.getLogger().log(Level.SEVERE,"",e);
         }
-        BroadcasterFactory.setBroadcasterFactory(new DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b), config);
+        Class bc = (b == null ? DefaultBroadcaster.class : b);
+        logger.log(Level.INFO, "configureDefaultBroadcasterFactor using class: "+bc);
+        BroadcasterFactory.setBroadcasterFactory(new DefaultBroadcasterFactory(bc), config);
     }
 
 
@@ -563,6 +564,7 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
             };
             pingForStats();
             doInitParams(scFacade);
+            configureDefaultBroadcasterFactory();
             detectGoogleAppEngine(scFacade);
             loadConfiguration(scFacade);
 
@@ -583,8 +585,8 @@ public class AtmosphereServlet extends AbstractAsyncServlet implements CometProc
     protected void configureBroadcaster() throws ClassNotFoundException, InstantiationException, IllegalAccessException {
 
         if (broadcasterFactory == null) {
-            broadcasterFactory = new DefaultBroadcasterFactory((Class<Broadcaster>)
-                    Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName));
+            Class bc = (Class<Broadcaster>) Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName);
+            broadcasterFactory = new DefaultBroadcasterFactory(bc);
             config.broadcasterFactory = broadcasterFactory;
         }
 
When I get my BroadcasterFactory working, and extends my JMSBroadcaster to include multiple topic/queue
i'll send that; it may be a few weeks :)
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
Salut,

On 10-12-06 1:52 PM, jackparker wrote:
>
> Jeanfrancois,
>     Thanks for the informative response.
> Indeed, using setBroadcaster() on the existing Resource works!
> And if I do my own BroadcasterFactory, that would obviate the need for that.
> [that is: the BroadcasterFactory should inject/create the correct
> Broadcaster *before* giving it to the AtmosphereHander]

Currently I don't think you can do that unless I add some Filter concept.


>
> 0: I have excluded atmosphere-ping so the problem is gone, but the essence
> was that because
> an illegal character in the URL at Line-66:
> URI.create(String.format("<a href="http://jfarcand.wordpress.com/ping-atmosphere-%s">http://jfarcand.wordpress.com/ping-atmosphere-%s",
> version)).toURL()
> because version was something like "${pom.version}"
> That is: the property file had not been filter/interpolated.
> This was only a problem because Eclipse running tomcat in debug mode wants
> to 'debug' the Exception that is thrown, and so would block.
> I was tempted to catch and handle better but just excluded
> atmosphere-ping.jar

I see. Strange Eclipse.


>
> The problem with getWrite()/getOutputStream() is in the old version of
> ChatAtmosphereHandler,
> where it is hard-coded to use res.getWriter().write(...)/flush();
> The only code I could find that checks PROPERTY_USE_STREAM was the write()
> method in AtmosphereResourceImpl
> So I hacked that to make it possible for the "Application" to use the same
> code to output the real data,
> as is used by the framework to send the flushComment/beginCompatibleData.
>
> :atmosphere>git diff
> modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
> diff --git
> a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
> b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
> index 6a9abd7..5200a01
> --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
> +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java
> @@ -174,6 +174,15 @@ public interface AtmosphereResource<E, F>  {
>       public void write(OutputStream os, Object o) throws IOException;
>
>       /**
> +     * Write the {@link String} using the {@link OutputStream} or {@link
> ServletWriter}
> +     * depending on the setting of org.atmosphere.useStream from web.xml
> init-params.
> +     *
> +     * @param str  {@link String}
> +     * @throws java.io.IOException
> +     */
> +    public void write(String str) throws IOException;
> +
> +    /**
>        * Get the {@link Serializer} or null if not defined.
>        *
>        * @return the {@link Serializer} or null if not defined.
>
> :atmosphere>git diff
> modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
> diff --git
> a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
> b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
> index 5e97729..b6fd700
> ---
> a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
> +++
> b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
> @@ -201,7 +201,11 @@ public class AtmosphereResourceImpl implements
>               }
>
>               if (flushComment) {
> -                write();
> +                try {
> +                    write(beginCompatibleData);
> +                } catch (Throwable ex) {
> +                    LoggerUtils.getLogger().log(Level.WARNING, "", ex);
> +                }
>               }
>               req.setAttribute(PRE_SUSPEND, "true");
>               action.type = AtmosphereServlet.Action.TYPE.SUSPEND;
> @@ -224,8 +228,13 @@ public class AtmosphereResourceImpl implements
>           }
>       }
>
> -    void write() {
> -        try {
> +    /**
> +     * write to this Response, using getWriter() or getOutputStream as
> indicated by useWriter
> +     * or property PROPERTY_USE_STREAM=org.atmosphere.useStream;
> +     */
> +    @Override
> +    public void write(String message) throws IOException {
> +//         try {
>               if (useWriter&&  !((Boolean)
> req.getAttribute(AtmosphereServlet.PROPERTY_USE_STREAM))) {
>                   try {
>                       res.getWriter();
> @@ -233,7 +242,7 @@ public class AtmosphereResourceImpl implements
>                       return;
>                   }
>
> -                res.getWriter().write(beginCompatibleData);
> +                res.getWriter().write(message);
>                   res.getWriter().flush();
>               } else {
>                   try {
> @@ -246,9 +255,9 @@ public class AtmosphereResourceImpl implements
>                   res.getOutputStream().flush();
>               }
>
> -        } catch (Throwable ex) {
> -            LoggerUtils.getLogger().log(Level.WARNING, "", ex);
> -        }
> +//         } catch (Throwable ex) {
> +//             LoggerUtils.getLogger().log(Level.WARNING, "", ex);
> +//         }
>       }

I see. Let me think of it, but I'm not against your idea.


>
>
> -----------------------------
> Here is the diff for configureDefaultBroadcasterFactory for 0.7-SNAPSHOT of
> ~ 24/Nov/2010.
> [blobs 1&  3 are the actual patch, blobs 2&  4 are just to make stepping in
> the debugger easier.]
>
> :atmosphere>git diff
> modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
> diff --git
> a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
> b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
> index 80ee851..0c2989e
> --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
> +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java
> @@ -396,7 +396,6 @@ public class AtmosphereServlet extends
> AbstractAsyncServlet implements CometProc
>       public AtmosphereServlet(boolean isFilter) {
>           this.isFilter = isFilter;
>           readSystemProperties();
> -        configureDefaultBroadcasterFactory();
>           populateBroadcasterType();
>       }
>
> @@ -412,7 +411,9 @@ public class AtmosphereServlet extends
> AbstractAsyncServlet implements CometProc
>           } catch (ClassNotFoundException e) {
>               LoggerUtils.getLogger().log(Level.SEVERE,"",e);
>           }
> -        BroadcasterFactory.setBroadcasterFactory(new
> DefaultBroadcasterFactory(b == null ? DefaultBroadcaster.class : b),
> config);
> +        Class bc = (b == null ? DefaultBroadcaster.class : b);
> +        logger.log(Level.INFO, "configureDefaultBroadcasterFactor using
> class: "+bc);
> +        BroadcasterFactory.setBroadcasterFactory(new
> DefaultBroadcasterFactory(bc), config);
>       }
>
>
> @@ -563,6 +564,7 @@ public class AtmosphereServlet extends
> AbstractAsyncServlet implements CometProc
>               };
>               pingForStats();
>               doInitParams(scFacade);
> +            configureDefaultBroadcasterFactory();
>               detectGoogleAppEngine(scFacade);
>               loadConfiguration(scFacade);
>
> @@ -583,8 +585,8 @@ public class AtmosphereServlet extends
> AbstractAsyncServlet implements CometProc
>       protected void configureBroadcaster() throws ClassNotFoundException,
> InstantiationException, IllegalAccessException {
>
>           if (broadcasterFactory == null) {
> -            broadcasterFactory = new
> DefaultBroadcasterFactory((Class<Broadcaster>)
> -
> Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName));
> +            Class bc = (Class<Broadcaster>)
> Thread.currentThread().getContextClassLoader().loadClass(broadcasterClassName);
> +            broadcasterFactory = new DefaultBroadcasterFactory(bc);
>               config.broadcasterFactory = broadcasterFactory;
>           }
>
> When I get my BroadcasterFactory working, and extends my JMSBroadcaster to
> include multiple topic/queue
> i'll send that; it may be a few weeks :)

Thanks a lot for the feedback. I will apply the patch above after I've
ran all the tests.

Thanks!

-- Jeanfrancois



>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

jackparker
jfarcand-3 wrote
Salut,

> And if I do my own BroadcasterFactory, that would obviate the need for that.
> [that is: the BroadcasterFactory should inject/create the correct
> Broadcaster *before* giving it to the AtmosphereHander]

Currently I don't think you can do that unless I add some Filter concept.
Agreed. would need some pluggable strategy to replace the map(servletPath)

However, this is not not really needed. Now that I see what the DefaultBroadcasterFactory does,
it took just a few hacks to my constructor/setID (to ignore the initial "default" id) and now
(with thanks to Mike, the OP, for his 'one-liner') this works in MyAtomspherHander.onRequest:
            // resource.getBroadcaster() = map("/{servletPath}"); but we want the per-user Broadcaster
            String path = req.getServletPath();
            String bcid = path+"/"+session.getUsername(); // also becomes the JMS Queue name
            BroadcasterFactory bf = BroadcasterFactory.getDefault();
            Broadcaster bc = bf.lookup(MyJMSBroadcaster.class, bcid, true);  // get or create/register MyJMSBroadcaster(bcid);
            bc.getBroadcasterConfig().addFilter(xssHtmlFilter);

            resource.setBroadcaster(bc); // add Broadcaster to resource
            resource.suspend();          // add resource to (Broadcaster)bc!

As Mike said, all sessions/requests/Resources with the same username
a) find the same Broadcaster, and B) are in the 'resources' list of that same Broadcaster
So if any message comes to that Broadcaster, all sessions for that user are notified.
MyJMSBroadcaster creates and listens to a dynamic JMS queue (using ApacheMQ)
messages can be put in queue bc.broadcast(message) which wraps jmsTemplate.convertAndSend()
or (more likely) by calling JMS directly without reference to Atmosphere.


ps, one more tweak to your demo code:
doing addFilter(new XSSHtmlFilter()) continues to stack more filters,
because the 'new' filter is not found in the collection; so define a singleton xssHtmlFilter.



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Finding my way - push to individual users

Jeanfrancois Arcand-4
Salut,

On 10-12-06 7:57 PM, jackparker wrote:

>
>
> jfarcand-3 wrote:
>>
>> Salut,
>>
>>> And if I do my own BroadcasterFactory, that would obviate the need for
>>> that.
>>> [that is: the BroadcasterFactory should inject/create the correct
>>> Broadcaster *before* giving it to the AtmosphereHander]
>>
>> Currently I don't think you can do that unless I add some Filter concept.
>>
> Agreed. would need some pluggable strategy to replace the map(servletPath)

Indeed.

>
> However, this is not not really needed. Now that I see what the
> DefaultBroadcasterFactory does,
> it took just a few hacks to my constructor/setID (to ignore the initial
> "default" id) and now
> (with thanks to Mike, the OP, for his 'one-liner') this works in
> MyAtomspherHander.onRequest:
>              // resource.getBroadcaster() = map("/{servletPath}"); but we
> want the per-user Broadcaster
>              String path = req.getServletPath();
>              String bcid = path+"/"+session.getUsername(); // also becomes
> the JMS Queue name
>              BroadcasterFactory bf = BroadcasterFactory.getDefault();
>              Broadcaster bc = bf.lookup(MyJMSBroadcaster.class, bcid, true);
> // get or create/register MyJMSBroadcaster(bcid);
>              bc.getBroadcasterConfig().addFilter(xssHtmlFilter);
>
>              resource.setBroadcaster(bc); // add Broadcaster to resource
>              resource.suspend();          // add resource to (Broadcaster)bc!
>
> As Mike said, all sessions/requests/Resources with the same username
> a) find the same Broadcaster, and B) are in the 'resources' list of that
> same Broadcaster
> So if any message comes to that Broadcaster, all sessions for that user are
> notified.
> MyJMSBroadcaster creates and listens to a dynamic JMS queue (using ApacheMQ)
> messages can be put in queue bc.broadcast(message) which wraps
> jmsTemplate.convertAndSend()
> or (more likely) by calling JMS directly without reference to Atmosphere.

Ok the above make sense.


>
>
> ps, one more tweak to your demo code:
> doing addFilter(new XSSHtmlFilter()) continues to stack more filters,
> because the 'new' filter is not found in the collection; so define a
> singleton xssHtmlFilter.

Hum...sound like a regression as the idea is to not allow duplicate.
Looking....

FYI I've applied the fix for the custom Broadcaster

https://github.com/Atmosphere/atmosphere/commit/29ba639120937677d6121f226587dd893acc2879 


Thanks!

-- Jeanfrancois



>
>
>
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Using JMS to push to individual users

jackparker
For the record, here is MyJMSBroadcaster (using Spring JmsTemplate, but that could be changed)
I restructrured a bit from the JMSBroadcaster...
I also had to hack around the super constructor's setID();

package my.pkg.comet;

import my.pkg.domain.Member; // Spring/roo generated class

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.atmosphere.util.AbstractBroadcasterProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;

/**
 * Broadcaster is used by some [My]AtmosphereHandler?
 * AtmosphereHandler and Broadcaster comprise a AtmosphereHandlerWrapper.
 * which is registered in AtmosphereServlet.handlers() by the Broadcaster.getID() [mapBroadcasterToAtmosphereHandler()]
 * [ also mapped from (servlet-path!, ahw); maybe bc.getID() is a servlet-path?? ]
 */
public class MyJMSBroadcaster extends AbstractBroadcasterProxy {
        private static final String TEMP_NAME = "MyJMSBroadcaster.unnamed";

        static private JmsTemplate staticJmsTemplate;
        static public QueueConnectionFactory getQueueConnectionFactory() {
                return (QueueConnectionFactory)staticJmsTemplate.getConnectionFactory();
        }

        // roundabout way for Spring to inject: static staticJmsTemplate = jmsTemplate;
        public static class JmsTemplateBean {
                public JmsTemplateBean(JmsTemplate jmsTemplate) {
                        staticJmsTemplate = jmsTemplate;
                }
        }

        // inner-class, with binding through pushIfAble() to broadcastRecievedMessage()
        public class JMSListener implements MessageListener {
                // sendMessage()/jmsConvertAndSend() is delivered here,
                // unwrap from JMS and push the message to the waiting client via comet response
                // message formatting/sending is done in MyAtmosphereHandler.onStateChange
                @Override
                public void onMessage(Message msg) {
                        try {
                                TextMessage textMessage = (TextMessage) msg;
                                String message = textMessage.getText();
                                MyJMSBroadcaster.this.pushIfAble(message);

                        } catch (JMSException ex) {
                                log.warn("JMSListener failed to push message: "+msg, ex);
                        }
                }
        }

    private final Logger log = LoggerFactory.getLogger(MyJMSBroadcaster.class);
    private QueueConnection qconnection;
    private QueueSession qsession;
        private Queue queue;
    private QueueReceiver qreceiver;
    private QueueSender qsender;
        private MessageListener messageListener = new JMSListener();

        // interface between JMSListener and MyJMSBroadcaster:
        private void pushIfAble(Object message) {
                if (message != null && bc != null) {
                        log.detail("pushIfAble: resources={}, message={}",resources, message);
                        broadcastReceivedMessage(message);
                        // push(new Entry(filter(newMsg), null, new BroadcasterFuture(newMsg), message));
                        // resources[].push(message)->onStateChange()->resource.write(format(message))
                }
        }

        private void updateConnections() {
                String queueId = getQueueName();
                if (queueId.equals(TEMP_NAME))
                        return; // take no action until setID() with real/final name
                try {
                        qconnection = getConnectionFactory().createQueueConnection();
                        qsession = qconnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                        // create a non-portable "queue identity"
                        // or use jndiContext.lookup("dynamicQueues/"+getQueueName())
                        // if (queue != null) queue.close();
                        if (qsender != null) qsender.close();
                        queue = qsession.createQueue(queueId);
            qsender = qsession.createSender(queue); // mostly unused in My
                        setMessageSelector(msgSelector);
                } catch (JMSException ex) {
                        log.warn("Could not update Connection/Queue", ex);
                        throw new IllegalStateException("Could not update Connection/Queue", ex);
                }
        }

        private QueueConnectionFactory connectionFactory;
        public QueueConnectionFactory getConnectionFactory() { return this.connectionFactory; }
    @Autowired
        public void setConnectionFactory(QueueConnectionFactory connectionFactory) {
                this.connectionFactory = connectionFactory;
                updateConnections(); // sets qreceiver, et al.
                start();
        }

        private String msgSelector = ""; // empty selector same as null
        public String getMessageSelector() { return this.msgSelector; }
        public void setMessageSelector(String msgSelector) {
                this.msgSelector = msgSelector; // we could test for actual change of msgSelector...
                setListeners(); // update the qreceiver & setListeners:
        }

        // Super-class [AbstractBroadcasterProxy] invokes start() in its constructor,
        // but that is before the connnetionFactory can be set.
        // [constructor injection would not help, as super(id) must be invoked before setting local fields]
        // so we interpose here to avoid setting the one-shot 'started' AtomicBoolean
        // and re-invoke start() after/when connectionFactory is set;
    protected void start() {
                if (getConnectionFactory() == null) {
                        return; // try again later...
                }
                super.start(); // set started, create BroadcastHandler and setListeners
        }

        private String getQueueName() {
                return getID(); // use id as queue name...
                // assert: (getID() == TEMP_NAME ? getQueueName() == TEMP_NAME : true)
        }

        // convenience method so we can keep queue name logic in one file.
        // send message to member on the Servlet(req)
        /**
         * Send a Comet event to a client application.
         * <p>
         * If you already know the client's queue name, you can:
         * <code>jmsTemplate.convertAndSend(queueName, message)</code>
         *
         * @param path the servletPath used by client comet request (identifies the application or domain)
         * @param member a myDS-authenticated Member, member.getUsername() is used.
         * @param message the String to be sent to the client as a comet event
         */
        public static void sendMessage(String path, Member member, String message) {
                String queueName = getQueueName(path, member);
                staticJmsTemplate.convertAndSend(queueName, message);
        }

        // create a receiver/listener for messages to this member on the Servlet(req)
        public static MyJMSBroadcaster newMyJMSBroadcaster(String path, Member member) {
                String queueName = getQueueName(path, member);
                return new MyJMSBroadcaster(queueName);
        }

        private static String getQueueName(String path, Member member) {
                //String path = req.getServletPath();
                return path+"/"+member.getUsername();
        }

        @Override
        public void setID(String id) {
                super.setID(id);
                updateConnections();
        }

    public MyJMSBroadcaster() {
        this(TEMP_NAME);
    }

    public MyJMSBroadcaster(String id) {
                // we would rather that Spring could inject ConnectionFactory, but for now we use self-service:
                // move ConnectionFactory from lastConnectionFactoryHolder to this:
                this(id, getQueueConnectionFactory());
    }

        // if we can ever create this Broadcaster from Spring, supply the ConnectionFactory to be used:
        /**
         * @param id for broadcaster.getID()
         * @param queueConnectionFactory typically a cachingConnectionFactory from Spring.
         */
    public MyJMSBroadcaster(String id, QueueConnectionFactory queueConnectionFactory) {
                super(id);   // set name, setID, empty BroadcasterCache, new BroadcasterConfig
                setConnectionFactory(queueConnectionFactory);
        }

        // might generalize this to listen to user's Queue and application's Topic
        // Q: is it better to have A) 1 "ALL" topic + 1000s of per-user queues + subtopic queues
        // or B) 1 APP queue with selection filters for "TO=${me} || TO=${ALL} || TO=${TOPIC-7}"
        // if we can pre-filter when the target group is small (and so not send to each machine)
        // then plan B is simpler and more flexible.
        // else the sender needs to expand the 'group-members-list'
        // A) setSelector(String str...) { createReceiver(getQueueByName(str)).addListener();... }
        // B) setSelector(String str...) { msgSelector += "TO=$str || ..."; getQ().createRec(mS).addListener(); }
        // We like (A); just send the DestinationNames (queue://user, topic://all, topic://game1)
        private void setListeners() {
                try {
                        QueueReceiver qr = qreceiver; // save until new qreceiver is configured
                        qreceiver = qsession.createReceiver(queue, msgSelector);
                        if (qr != null)
                                qr.close(); // kill old qreceiver;

                        qreceiver.setMessageListener(messageListener);
                        log.detail("set MessageListener for queue {}", queue);
        } catch (JMSException ex) {
                        log.warn("Could not update MessageListener for "+queue, ex);
            throw new IllegalStateException("Could not create MessageListener for "+queue, ex);
        }
        }
   
        // incomingBroadcast is the 'run()' method of the resulting BroadcastHandler:
        // invoked by ExecutorService when Broadcaster.start() {
        //    bc.getExecutorService().submit(getBroadcastHandler());
        // }
        // start() is called in ABP.constructor(id) [before msgSelector is set!]
        // rely that 'this.incomingBroadcast() is blocked in Executor until constructor has completed?!
        // note to self: maybe select on JMSTimestamp
        //
        // DefaultBroadcaster is way convoluted...
        // each call to broadcast(msg) calls: {start(); messages.offer(new Entry(newMsg,null,future,msg); return future;}
        // start() has a one-shot that calls executorService.submit(getBroadcastHandler())
        //     getBroadcastHandler() [misnomer] returns a Runnable [which is interesting for DefaultBroadcaster
        //     because it continues to re-submit itself to push(messages.take())]
        //     Hmm: the Entry msg holds a [Broadcaster]Future, and that future is done/canceled.
        //
        // push(Entry) iterates multiple resources on the Broadcaster; delivering to push(resource, finalMsg)
        // push(r,msg) checks lifecycle, then calls: broadcast(r,e(r,msg))
        // broadcast(r,e) finds the AtmosphereHandler and calls AtmosphereHandler.onStateChange(e);
        // presumably each Resource to which one is broadcasting can have its own Handler.
        // onStateChange(e) does e.resource.write(toJson(message))

        // but: once the JMS conection is started (which AbstractBroadcasterProxy does in contstuctor)
        // [we finesse all that, just doing staticJmsTemplate.convertAndSend(msg)]
        //
        // when a message is received from JMS we call: broadcastReceivedMessage(message)
        // broadcastReceivedMessage(message) calls push(new Entry(filter(message), null, Future, message));
        // push(Entry) -> push(r,msg) -> broadcast(r,e)->handler.onStateChange(e)->e.r.write(toJson(message))
        // BUT: AbstractBroadcasterProxy sets writeLocally=false, so nothing is emitted!

    /**
     * {@inheritDoc}
     */
    @Override
    public void incomingBroadcast() {
        try {
            qconnection.start();
        } catch (JMSException ex) {
                        log.warn("Unable to initialize BroadcastHandler", ex);
            throw new IllegalStateException("Unable to initialize BroadcastHandler", ex);
        }
    }

        // AbstractBroadcasterProxy.broadcast(...) comes here; the subsequent push(Entry) has little effect

        // Forward message to this queue/topic/destination Listeners [ie: this.messageListener]
        // where it emerges on the set of resources assigned to this Broadcaster.
        // The application uses JmsTemplate to send to queue/topic. [MyJMSBroadcaster.sendMessage()]
        // Note: the Chat/demo code may call bc.broadcast("suspended") or some such
    /**
     * {@inheritDoc}
     */
    @Override
    public void outgoingBroadcast(final Object message) {
        try {
                        // Note: JmsTemplate creates a producer for each send! finally{closeProducer()}
// staticJmsTemplate.send(queue, new MessageCreator() {
// Message createMessage(Session session) {
// return session.createTextMessage(message.toString());
// }
// });
            qsender.send(qsession.createTextMessage(message.toString()));
                        //if (qos) producer.send(message, deliveryMode, priority, ttl);
        } catch (JMSException ex) {
                        log.warn("Unable to send outgoingBroadcast", ex);
        }
    }
}
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Using JMS to push to individual users

Jeanfrancois Arcand-4
Salut,

thanks for sharing the code. I've first merged Christopher's proposal
(and exposed AtmosphereConfig to Broadcaster so init param can be used):

   http://is.gd/irsnA

will it cover your use case as well? If not can you tell me what's missing?

Thanks!

-- Jeanfrancois

On 10-12-08 8:16 PM, jackparker wrote:

>
> For the record, here is MyJMSBroadcaster (using Spring JmsTemplate, but that
> could be changed)
> I restructrured a bit from the JMSBroadcaster...
> I also had to hack around the super constructor's setID();
>
> package my.pkg.comet;
>
> import my.pkg.domain.Member; // Spring/roo generated class
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueReceiver;
> import javax.jms.QueueSender;
> import javax.jms.QueueSession;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.atmosphere.util.AbstractBroadcasterProxy;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.springframework.beans.factory.annotation.Autowired;
> import org.springframework.jms.core.JmsTemplate;
>
> /**
>   * Broadcaster is used by some [My]AtmosphereHandler?
>   * AtmosphereHandler and Broadcaster comprise a AtmosphereHandlerWrapper.
>   * which is registered in AtmosphereServlet.handlers() by the
> Broadcaster.getID() [mapBroadcasterToAtmosphereHandler()]
>   * [ also mapped from (servlet-path!, ahw); maybe bc.getID() is a
> servlet-path?? ]
>   */
> public class MyJMSBroadcaster extends AbstractBroadcasterProxy {
> private static final String TEMP_NAME = "MyJMSBroadcaster.unnamed";
>
> static private JmsTemplate staticJmsTemplate;
> static public QueueConnectionFactory getQueueConnectionFactory() {
> return (QueueConnectionFactory)staticJmsTemplate.getConnectionFactory();
> }
>
> // roundabout way for Spring to inject: static staticJmsTemplate =
> jmsTemplate;
> public static class JmsTemplateBean {
> public JmsTemplateBean(JmsTemplate jmsTemplate) {
> staticJmsTemplate = jmsTemplate;
> }
> }
>
> // inner-class, with binding through pushIfAble() to
> broadcastRecievedMessage()
> public class JMSListener implements MessageListener {
> // sendMessage()/jmsConvertAndSend() is delivered here,
> // unwrap from JMS and push the message to the waiting client via comet
> response
> // message formatting/sending is done in MyAtmosphereHandler.onStateChange
> @Override
> public void onMessage(Message msg) {
> try {
> TextMessage textMessage = (TextMessage) msg;
> String message = textMessage.getText();
> MyJMSBroadcaster.this.pushIfAble(message);
>
> } catch (JMSException ex) {
> log.warn("JMSListener failed to push message: "+msg, ex);
> }
> }
> }
>
>      private final Logger log =
> LoggerFactory.getLogger(MyJMSBroadcaster.class);
>      private QueueConnection qconnection;
>      private QueueSession qsession;
> private Queue queue;
>      private QueueReceiver qreceiver;
>      private QueueSender qsender;
> private MessageListener messageListener = new JMSListener();
>
> // interface between JMSListener and MyJMSBroadcaster:
> private void pushIfAble(Object message) {
> if (message != null&&  bc != null) {
> log.detail("pushIfAble: resources={}, message={}",resources, message);
> broadcastReceivedMessage(message);
> // push(new Entry(filter(newMsg), null, new BroadcasterFuture(newMsg),
> message));
> //
> resources[].push(message)->onStateChange()->resource.write(format(message))
> }
> }
>
> private void updateConnections() {
> String queueId = getQueueName();
> if (queueId.equals(TEMP_NAME))
> return; // take no action until setID() with real/final name
> try {
> qconnection = getConnectionFactory().createQueueConnection();
> qsession = qconnection.createQueueSession(false,
> Session.AUTO_ACKNOWLEDGE);
> // create a non-portable "queue identity"
> // or use jndiContext.lookup("dynamicQueues/"+getQueueName())
> // if (queue != null) queue.close();
> if (qsender != null) qsender.close();
> queue = qsession.createQueue(queueId);
>              qsender = qsession.createSender(queue); // mostly unused in My
> setMessageSelector(msgSelector);
> } catch (JMSException ex) {
> log.warn("Could not update Connection/Queue", ex);
> throw new IllegalStateException("Could not update Connection/Queue", ex);
> }
> }
>
> private QueueConnectionFactory connectionFactory;
> public QueueConnectionFactory getConnectionFactory() { return
> this.connectionFactory; }
>      @Autowired
> public void setConnectionFactory(QueueConnectionFactory connectionFactory)
> {
> this.connectionFactory = connectionFactory;
> updateConnections(); // sets qreceiver, et al.
> start();
> }
>
> private String msgSelector = ""; // empty selector same as null
> public String getMessageSelector() { return this.msgSelector; }
> public void setMessageSelector(String msgSelector) {
> this.msgSelector = msgSelector; // we could test for actual change of
> msgSelector...
> setListeners(); // update the qreceiver&  setListeners:
> }
>
> // Super-class [AbstractBroadcasterProxy] invokes start() in its
> constructor,
> // but that is before the connnetionFactory can be set.
> // [constructor injection would not help, as super(id) must be invoked
> before setting local fields]
> // so we interpose here to avoid setting the one-shot 'started'
> AtomicBoolean
> // and re-invoke start() after/when connectionFactory is set;
>      protected void start() {
> if (getConnectionFactory() == null) {
> return; // try again later...
> }
> super.start(); // set started, create BroadcastHandler and
> setListeners
> }
>
> private String getQueueName() {
> return getID(); // use id as queue name...
> // assert: (getID() == TEMP_NAME ? getQueueName() == TEMP_NAME : true)
> }
>
> // convenience method so we can keep queue name logic in one file.
> // send message to member on the Servlet(req)
> /**
> * Send a Comet event to a client application.
> *<p>
> * If you already know the client's queue name, you can:
> *<code>jmsTemplate.convertAndSend(queueName, message)</code>
> *
> * @param path the servletPath used by client comet request (identifies the
> application or domain)
> * @param member a myDS-authenticated Member, member.getUsername() is used.
> * @param message the String to be sent to the client as a comet event
> */
> public static void sendMessage(String path, Member member, String message)
> {
> String queueName = getQueueName(path, member);
> staticJmsTemplate.convertAndSend(queueName, message);
> }
>
> // create a receiver/listener for messages to this member on the
> Servlet(req)
> public static MyJMSBroadcaster newMyJMSBroadcaster(String path, Member
> member) {
> String queueName = getQueueName(path, member);
> return new MyJMSBroadcaster(queueName);
> }
>
> private static String getQueueName(String path, Member member) {
> //String path = req.getServletPath();
> return path+"/"+member.getUsername();
> }
>
> @Override
> public void setID(String id) {
> super.setID(id);
> updateConnections();
> }
>
>      public MyJMSBroadcaster() {
>          this(TEMP_NAME);
>      }
>
>      public MyJMSBroadcaster(String id) {
> // we would rather that Spring could inject ConnectionFactory, but for now
> we use self-service:
> // move ConnectionFactory from lastConnectionFactoryHolder to this:
> this(id, getQueueConnectionFactory());
>      }
>
> // if we can ever create this Broadcaster from Spring, supply the
> ConnectionFactory to be used:
> /**
> * @param id for broadcaster.getID()
> * @param queueConnectionFactory typically a cachingConnectionFactory from
> Spring.
> */
>      public MyJMSBroadcaster(String id, QueueConnectionFactory
> queueConnectionFactory) {
> super(id);   // set name, setID, empty BroadcasterCache, new
> BroadcasterConfig
> setConnectionFactory(queueConnectionFactory);
> }
>
> // might generalize this to listen to user's Queue and application's Topic
> // Q: is it better to have A) 1 "ALL" topic + 1000s of per-user queues +
> subtopic queues
> // or B) 1 APP queue with selection filters for "TO=${me} || TO=${ALL} ||
> TO=${TOPIC-7}"
> // if we can pre-filter when the target group is small (and so not send to
> each machine)
> // then plan B is simpler and more flexible.
> // else the sender needs to expand the 'group-members-list'
> // A) setSelector(String str...) {
> createReceiver(getQueueByName(str)).addListener();... }
> // B) setSelector(String str...) { msgSelector += "TO=$str || ...";
> getQ().createRec(mS).addListener(); }
> // We like (A); just send the DestinationNames (queue://user, topic://all,
> topic://game1)
> private void setListeners() {
> try {
> QueueReceiver qr = qreceiver; // save until new qreceiver is configured
> qreceiver = qsession.createReceiver(queue, msgSelector);
> if (qr != null)
> qr.close(); // kill old qreceiver;
>
> qreceiver.setMessageListener(messageListener);
> log.detail("set MessageListener for queue {}", queue);
>          } catch (JMSException ex) {
> log.warn("Could not update MessageListener for "+queue, ex);
>              throw new IllegalStateException("Could not create
> MessageListener for "+queue, ex);
>          }
> }
>
> // incomingBroadcast is the 'run()' method of the resulting
> BroadcastHandler:
> // invoked by ExecutorService when Broadcaster.start() {
> //    bc.getExecutorService().submit(getBroadcastHandler());
> // }
> // start() is called in ABP.constructor(id) [before msgSelector is set!]
> // rely that 'this.incomingBroadcast() is blocked in Executor until
> constructor has completed?!
> // note to self: maybe select on JMSTimestamp
> //
> // DefaultBroadcaster is way convoluted...
> // each call to broadcast(msg) calls: {start(); messages.offer(new
> Entry(newMsg,null,future,msg); return future;}
> // start() has a one-shot that calls
> executorService.submit(getBroadcastHandler())
> //     getBroadcastHandler() [misnomer] returns a Runnable [which is
> interesting for DefaultBroadcaster
> //     because it continues to re-submit itself to push(messages.take())]
> //     Hmm: the Entry msg holds a [Broadcaster]Future, and that future is
> done/canceled.
> //
> // push(Entry) iterates multiple resources on the Broadcaster; delivering
> to push(resource, finalMsg)
> // push(r,msg) checks lifecycle, then calls: broadcast(r,e(r,msg))
> // broadcast(r,e) finds the AtmosphereHandler and calls
> AtmosphereHandler.onStateChange(e);
> // presumably each Resource to which one is broadcasting can have its own
> Handler.
> // onStateChange(e) does e.resource.write(toJson(message))
>
> // but: once the JMS conection is started (which AbstractBroadcasterProxy
> does in contstuctor)
> // [we finesse all that, just doing staticJmsTemplate.convertAndSend(msg)]
> //
> // when a message is received from JMS we call:
> broadcastReceivedMessage(message)
> // broadcastReceivedMessage(message) calls push(new Entry(filter(message),
> null, Future, message));
> // push(Entry) ->  push(r,msg) ->
> broadcast(r,e)->handler.onStateChange(e)->e.r.write(toJson(message))
> // BUT: AbstractBroadcasterProxy sets writeLocally=false, so nothing is
> emitted!
>
>      /**
>       * {@inheritDoc}
>       */
>      @Override
>      public void incomingBroadcast() {
>          try {
>              qconnection.start();
>          } catch (JMSException ex) {
> log.warn("Unable to initialize BroadcastHandler", ex);
>              throw new IllegalStateException("Unable to initialize
> BroadcastHandler", ex);
>          }
>      }
>
> // AbstractBroadcasterProxy.broadcast(...) comes here; the subsequent
> push(Entry) has little effect
>
> // Forward message to this queue/topic/destination Listeners [ie:
> this.messageListener]
> // where it emerges on the set of resources assigned to this Broadcaster.
> // The application uses JmsTemplate to send to queue/topic.
> [MyJMSBroadcaster.sendMessage()]
> // Note: the Chat/demo code may call bc.broadcast("suspended") or some such
>      /**
>       * {@inheritDoc}
>       */
>      @Override
>      public void outgoingBroadcast(final Object message) {
>          try {
> // Note: JmsTemplate creates a producer for each send!
> finally{closeProducer()}
> // staticJmsTemplate.send(queue, new MessageCreator() {
> // Message createMessage(Session session) {
> // return session.createTextMessage(message.toString());
> // }
> // });
>              qsender.send(qsession.createTextMessage(message.toString()));
> //if (qos) producer.send(message, deliveryMode, priority, ttl);
>          } catch (JMSException ex) {
> log.warn("Unable to send outgoingBroadcast", ex);
>          }
>      }
> }
>
12
Loading...