DefaultBroadcaster

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

DefaultBroadcaster

Florian Hars-3
I am trying to subscribe each incoming event to one central Broadcaster
(for e.g. system messages). But the only Broadcaster atmosphere gives me
is the DefaultBroadcaster, which expects an AtmosphereEventImpl as
argument to addAtmosphereEvent, while all atmosphere gives me in the
onEvent method of my handler is an
atmosphereEvent<HttpServletRequest,HttpServletResponse>, which I can't
add to a DefaultBroadcaster without an explicit conversion or cast.

So shouldn't (sorry for the stupid line breaks inserted by thunderbird)

public class DefaultBroadcaster implements
Broadcaster<String,AtmosphereEventImpl> {

rather be something like

public class DefaultBroadcaster<E,F> implements
Broadcaster<String,AtmosphereEvent<E,F>> {

if you want to use atmosphere for anything except chatting with yourself?

Another thing I don't quite understand about DefaultBroadcaster is code like

72     public void broadcast(String msg) {
73          for (AtmosphereEventImpl event : events) {
74              try {
75                  event.setMessage(msg);
76                  broadcast(event);
77              } catch (IOException ex) {
78                  LoggerUtils.getLogger().log(Level.WARNING, "", ex);
79              }
80          }
81      }

which destructively updates event before passing it to another method.
Now if I understand the TwitterAtmosphereHandler example and the JavaEE
spec correctly, the same event may be added to several Broadcasters and
the container is free to execute the code in parallel threads, so what
is the convincing argument that this architecture is race free that I am
unaware of?

- Florian

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Jeanfrancois Arcand
Salut,

Florian Hars wrote:

> I am trying to subscribe each incoming event to one central Broadcaster
> (for e.g. system messages). But the only Broadcaster atmosphere gives me
> is the DefaultBroadcaster, which expects an AtmosphereEventImpl as
> argument to addAtmosphereEvent, while all atmosphere gives me in the
> onEvent method of my handler is an
> atmosphereEvent<HttpServletRequest,HttpServletResponse>, which I can't
> add to a DefaultBroadcaster without an explicit conversion or cast.
>
> So shouldn't (sorry for the stupid line breaks inserted by thunderbird)
>
> public class DefaultBroadcaster implements
> Broadcaster<String,AtmosphereEventImpl> {
>
> rather be something like
>
> public class DefaultBroadcaster<E,F> implements
> Broadcaster<String,AtmosphereEvent<E,F>> {
>
> if you want to use atmosphere for anything except chatting with yourself?

Hum....I've used String,AtmosphereEventImpl to make it simple to use and
avoid the user to itself type the constructor. You think it will be
simpler to not do that? Let me think of it..



>
> Another thing I don't quite understand about DefaultBroadcaster is code
> like
>
> 72     public void broadcast(String msg) {
> 73          for (AtmosphereEventImpl event : events) {
> 74              try {
> 75                  event.setMessage(msg);
> 76                  broadcast(event);
> 77              } catch (IOException ex) {
> 78                  LoggerUtils.getLogger().log(Level.WARNING, "", ex);
> 79              }
> 80          }
> 81      }
>
> which destructively updates event before passing it to another method.

Why destructively? It needs to set the message on the AtmosphereEvent
before it pust it to the AtmosphereHandler. Without that, the
broadcasted messages will never be set on that object.



> Now if I understand the TwitterAtmosphereHandler example and the JavaEE
> spec correctly, the same event may be added to several Broadcasters and
> the container is free to execute the code in parallel threads, so what
> is the convincing argument that this architecture is race free that I am
> unaware of?

This is not in the JavaEE spec. You are free to execute the Broadcaster
using the calling threads (the one from the container you are running
on) or use an ExecutorServices
(Broadcaster.getBroadcasterConfig().setExecutorService()) and use your
own thread pool to execute the push. Performance wise, it is of course
better to use an ExecutorService, but to make more trivial to debug I
don't set an executor by default.

Does it helps?

Thanks

-- Jeanfrancois



>
> - Florian
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Florian Hars-3
Jeanfrancois Arcand schrieb:
> Hum....I've used String,AtmosphereEventImpl to make it simple to use and
> avoid the user to itself type the constructor. You think it will be
> simpler to not do that? Let me think of it..

Maybe it is because I use scala which may be a bit more picky about its
parametric types, but it doesn't like your specific mixture of
AtmosphereEvent[E,F] and AtmophereEventImpl:

$ scala -cp atmosphere-portable-runtime-0.2-SNAPSHOT.jar:../sailfin/lib/javaee.jar
Welcome to Scala version 2.7.4.final (OpenJDK 64-Bit Server VM, Java 1.6.0_0).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.atmosphere.cpr._                                                                                                                                                                                                
import org.atmosphere.cpr._

scala> import javax.servlet.http._                                                                                                                                                                                                
import javax.servlet.http._

scala> trait MyHandler extends AtmosphereHandler[HttpServletRequest,HttpServletResponse] {
     |   def onEvent(ev: AtmosphereEvent[HttpServletRequest,HttpServletResponse]) = {    
     |     val bc = new DefaultBroadcaster(this)                                        
     |     bc.addAtmosphereEvent(ev)                                                    
     |   }
     | }
<console>:11: error: type mismatch;
 found   : org.atmosphere.cpr.AtmosphereEvent[javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse]
 required: org.atmosphere.cpr.AtmosphereEventImpl
           bc.addAtmosphereEvent(ev)
                                 ^

Maybe it would help to parametrize AtmosphereHandler not only by the type arguments
of the AtmosphereEvent,

  public interface AtmosphereHandler<F,G>

but also by the AtmosphereEvent itself

  public interface AtmosphereHandler<E extends AtmosphereEvent<F,G>,F,G>

Then you can do (again in the scala interpreter):

scala> trait AlternateHandler[E <: AtmosphereEvent[F,G],F,G] { def onEvent(ev: E) : E; def onMessage(ev: E) : E }
defined trait AlternateHandler

scala> trait AlternateBroadcaster[M,E <: AtmosphereEvent[F,G],F,G] {                                                                                                                                                      
     |   def addAtmosphereEvent(ev: E) { println("added " + ev) }                                                                                                                                                          
     |   def broadcast(msg: M): Unit
     | }
defined trait AlternateBroadcaster

scala> class AlternateDefaultBroadcaster(h: AlternateHandler[AtmosphereEventImpl,HttpServletRequest,HttpServletResponse]) extends AlternateBroadcaster[String,AtmosphereEventImpl,HttpServletRequest,HttpServletResponse] {
     |   def broadcast(msg: String) { println("Broadcast: " + msg) }
     | }
defined class AlternateDefaultBroadcaster

scala> trait MyHandler extends AlternateHandler[AtmosphereEventImpl,HttpServletRequest,HttpServletResponse] {                                                                                                              
     |   def onEvent(ev: AtmosphereEventImpl) = {                                                                                                                                                                          
     |     val bc = new AlternateDefaultBroadcaster(this)                                                                                                                                                                  
     |     bc.addAtmosphereEvent(ev)                                                                                                                                                                                      
     |     bc.broadcast("I want ocaml style functors")                                                                                                                                                                    
     |     ev
     |   }
     | }
defined trait MyHandler

The type signatures are a bit on the long side, but at least it looks consistent.

If this is worthwhile depends on whether you expect to do comet over other transports
than http. Otherwise you could just rename AtmosphereEventImpl to AtmosphereEvent and
hard code HttpServlet{Request,Response} everywhere and get rid of most type parameters.

Or keep the generic versions, rename AtmosphereEventImpl to

  public class DefaultEvent implements AtmosphereEvent<HttpServletRequest,HttpServletResponse> { ...

add a new

  public interface DefaultHandler extends AtmosphereHandler<DefaultEvent,HttpServletRequest,HttpServletResponse>

and change DefaultBroadcaster to

  public class DefaultBroadcaster(DefaultHandler hander) implements AtmosphereHander<String,DefaultEvent,HttpServletRequest,HttpServletResponse> { ...

Then everybody will use the Default* classes, and you still keep the flexibility
do change things if somebody invents a new protocol that is stupid enough to
need comet for bidirectional connections.

Or something like this.

- Florian

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Jeanfrancois Arcand
Salut,

Florian Hars wrote:

> Jeanfrancois Arcand schrieb:
>> Hum....I've used String,AtmosphereEventImpl to make it simple to use and
>> avoid the user to itself type the constructor. You think it will be
>> simpler to not do that? Let me think of it..
>
> Maybe it is because I use scala which may be a bit more picky about its
> parametric types, but it doesn't like your specific mixture of
> AtmosphereEvent[E,F] and AtmophereEventImpl:
>
> $ scala -cp atmosphere-portable-runtime-0.2-SNAPSHOT.jar:../sailfin/lib/javaee.jar
> Welcome to Scala version 2.7.4.final (OpenJDK 64-Bit Server VM, Java 1.6.0_0).
> Type in expressions to have them evaluated.
> Type :help for more information.


I see...I just tried and you are right I need to fix that.


>
> scala> import org.atmosphere.cpr._                                                                                                                                                                                                
> import org.atmosphere.cpr._
>
> scala> import javax.servlet.http._                                                                                                                                                                                                
> import javax.servlet.http._
>
> scala> trait MyHandler extends AtmosphereHandler[HttpServletRequest,HttpServletResponse] {
>      |   def onEvent(ev: AtmosphereEvent[HttpServletRequest,HttpServletResponse]) = {    
>      |     val bc = new DefaultBroadcaster(this)                                        
>      |     bc.addAtmosphereEvent(ev)                                                    
>      |   }
>      | }
> <console>:11: error: type mismatch;
>  found   : org.atmosphere.cpr.AtmosphereEvent[javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse]
>  required: org.atmosphere.cpr.AtmosphereEventImpl
>            bc.addAtmosphereEvent(ev)
>                                  ^
>
> Maybe it would help to parametrize AtmosphereHandler not only by the type arguments
> of the AtmosphereEvent,
>
>   public interface AtmosphereHandler<F,G>
>
> but also by the AtmosphereEvent itself
>
>   public interface AtmosphereHandler<E extends AtmosphereEvent<F,G>,F,G>
>
> Then you can do (again in the scala interpreter):
>
> scala> trait AlternateHandler[E <: AtmosphereEvent[F,G],F,G] { def onEvent(ev: E) : E; def onMessage(ev: E) : E }
> defined trait AlternateHandler
>
> scala> trait AlternateBroadcaster[M,E <: AtmosphereEvent[F,G],F,G] {                                                                                                                                                      
>      |   def addAtmosphereEvent(ev: E) { println("added " + ev) }                                                                                                                                                          
>      |   def broadcast(msg: M): Unit
>      | }
> defined trait AlternateBroadcaster
>
> scala> class AlternateDefaultBroadcaster(h: AlternateHandler[AtmosphereEventImpl,HttpServletRequest,HttpServletResponse]) extends AlternateBroadcaster[String,AtmosphereEventImpl,HttpServletRequest,HttpServletResponse] {
>      |   def broadcast(msg: String) { println("Broadcast: " + msg) }
>      | }
> defined class AlternateDefaultBroadcaster
>
> scala> trait MyHandler extends AlternateHandler[AtmosphereEventImpl,HttpServletRequest,HttpServletResponse] {                                                                                                              
>      |   def onEvent(ev: AtmosphereEventImpl) = {                                                                                                                                                                          
>      |     val bc = new AlternateDefaultBroadcaster(this)                                                                                                                                                                  
>      |     bc.addAtmosphereEvent(ev)                                                                                                                                                                                      
>      |     bc.broadcast("I want ocaml style functors")                                                                                                                                                                    
>      |     ev
>      |   }
>      | }
> defined trait MyHandler
>
> The type signatures are a bit on the long side, but at least it looks consistent.
>
> If this is worthwhile depends on whether you expect to do comet over other transports
> than http. Otherwise you could just rename AtmosphereEventImpl to AtmosphereEvent and
> hard code HttpServlet{Request,Response} everywhere and get rid of most type parameters.

I was planning to add support for framework like Netty and that's the
main reason why I was using parameters.

>
> Or keep the generic versions, rename AtmosphereEventImpl to
>
>   public class DefaultEvent implements AtmosphereEvent<HttpServletRequest,HttpServletResponse> { ...
>
> add a new
>
>   public interface DefaultHandler extends AtmosphereHandler<DefaultEvent,HttpServletRequest,HttpServletResponse>
>
> and change DefaultBroadcaster to
>
>   public class DefaultBroadcaster(DefaultHandler hander) implements AtmosphereHander<String,DefaultEvent,HttpServletRequest,HttpServletResponse> { ...
>
> Then everybody will use the Default* classes, and you still keep the flexibility
> do change things if somebody invents a new protocol that is stupid enough to
> need comet for bidirectional connections.
>
> Or something like this.

I like the idea. Let me work on this today.

Thanks!

-- Jeanfrancois




>
> - Florian
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Florian Hars-3
In reply to this post by Jeanfrancois Arcand
Jeanfrancois Arcand wrote:
> This is not in the JavaEE spec. You are free to execute the Broadcaster
> using the calling threads (the one from the container you are running
> on) or use an ExecutorServices

But the problem seems to be the Broadcaster.broadcast(String ...)
methods like the one I cited in my mail. If you call one of these from
onEvent, which is called by the AtmosphereServlet, several container
threads may be executing the code at the same time, and the
synchronization on the MessageHandler starts only later.

Now that I have a working CometSupport, I can actually test this. I
started my simplified chat handler and opened two get requests with

$ nc localhost 8080 | tee log1
GET /chat/msgs/ HTTP/1.1
Host: localhost

Then I posted 60000 different messages using six parallel processes (all
in one line, of course):

$ for i in 1 2 3 4 5 6; do (for j in `seq 1 10`; do wget
--post-data="action=post&message=MSG%20$i%20$j' -q --no-http-keep-alive
http://localhost:8080/chat/msgs/ -O -; done) & done

And 60000 messages were deliverd to the listening connection:

$ grep MSG log1 | wc -l
60000

But not 60000 different messages:

$ grep MSG log1 | sort -u | wc -l
59075
$ grep MSG log1 | sort |uniq -c | sort -n | tail
       2 MSG 6 9363 Fri Jun 05 20:17:15 CEST 2009
       2 MSG 6 9507 Fri Jun 05 20:17:20 CEST 2009
       2 MSG 6 9529 Fri Jun 05 20:17:20 CEST 2009
       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009

And not the same ones for both listening connections:

$ grep MSG log2 | sort -u | wc -l
58735
$ grep MSG log2 | sort |uniq -c | sort -n | tail
       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
       3 MSG 2 8572 Fri Jun 05 20:16:52 CEST 2009
       3 MSG 3 1550 Fri Jun 05 20:14:32 CEST 2009
       3 MSG 6 5935 Fri Jun 05 20:15:58 CEST 2009
       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009

That is why the destructive update performed by event.setMessage is bad:
once another thread overwrites the message set by another thread, the
original message is forever destroyed.

I've got a hunch that there is something like a ConcurrentLinkedQueue
for the messages missing somewhere.

- Florian.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Jeanfrancois Arcand
Salut,

Florian Hars wrote:

> Jeanfrancois Arcand wrote:
>> This is not in the JavaEE spec. You are free to execute the
>> Broadcaster using the calling threads (the one from the container you
>> are running on) or use an ExecutorServices
>
> But the problem seems to be the Broadcaster.broadcast(String ...)
> methods like the one I cited in my mail. If you call one of these from
> onEvent, which is called by the AtmosphereServlet, several container
> threads may be executing the code at the same time, and the
> synchronization on the MessageHandler starts only later.
>
> Now that I have a working CometSupport, I can actually test this. I
> started my simplified chat handler and opened two get requests with
>
> $ nc localhost 8080 | tee log1
> GET /chat/msgs/ HTTP/1.1
> Host: localhost
>
> Then I posted 60000 different messages using six parallel processes (all
> in one line, of course):
>
> $ for i in 1 2 3 4 5 6; do (for j in `seq 1 10`; do wget
> --post-data="action=post&message=MSG%20$i%20$j' -q --no-http-keep-alive
> http://localhost:8080/chat/msgs/ -O -; done) & done
>
> And 60000 messages were deliverd to the listening connection:
>
> $ grep MSG log1 | wc -l
> 60000
>
> But not 60000 different messages:
>
> $ grep MSG log1 | sort -u | wc -l
> 59075
> $ grep MSG log1 | sort |uniq -c | sort -n | tail
>       2 MSG 6 9363 Fri Jun 05 20:17:15 CEST 2009
>       2 MSG 6 9507 Fri Jun 05 20:17:20 CEST 2009
>       2 MSG 6 9529 Fri Jun 05 20:17:20 CEST 2009
>       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
>       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
>       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
>       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
>       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
>       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
>       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009
>
> And not the same ones for both listening connections:
>
> $ grep MSG log2 | sort -u | wc -l
> 58735
> $ grep MSG log2 | sort |uniq -c | sort -n | tail
>       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
>       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
>       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
>       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
>       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
>       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
>       3 MSG 2 8572 Fri Jun 05 20:16:52 CEST 2009
>       3 MSG 3 1550 Fri Jun 05 20:14:32 CEST 2009
>       3 MSG 6 5935 Fri Jun 05 20:15:58 CEST 2009
>       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009
>
> That is why the destructive update performed by event.setMessage is bad:
> once another thread overwrites the message set by another thread, the
> original message is forever destroyed.

Bingo. I do see the problem now. Let me work on this ASAP.


>
> I've got a hunch that there is something like a ConcurrentLinkedQueue
> for the messages missing somewhere.


Let me think about it. I should have something ready in the hour or so.

Many many thanks!

-- Jeanfrancois


>
> - Florian.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Jeanfrancois Arcand
Salut,

this is fixed now and I'm uploading the new artifact (should be there
soon). Let me know how it goes. I will now look at the Scala issue you
pointed out.

Thanks for the help so far!

-- Jeanfrancois

Jeanfrancois Arcand wrote:

> Salut,
>
> Florian Hars wrote:
>> Jeanfrancois Arcand wrote:
>>> This is not in the JavaEE spec. You are free to execute the
>>> Broadcaster using the calling threads (the one from the container you
>>> are running on) or use an ExecutorServices
>>
>> But the problem seems to be the Broadcaster.broadcast(String ...)
>> methods like the one I cited in my mail. If you call one of these from
>> onEvent, which is called by the AtmosphereServlet, several container
>> threads may be executing the code at the same time, and the
>> synchronization on the MessageHandler starts only later.
>>
>> Now that I have a working CometSupport, I can actually test this. I
>> started my simplified chat handler and opened two get requests with
>>
>> $ nc localhost 8080 | tee log1
>> GET /chat/msgs/ HTTP/1.1
>> Host: localhost
>>
>> Then I posted 60000 different messages using six parallel processes
>> (all in one line, of course):
>>
>> $ for i in 1 2 3 4 5 6; do (for j in `seq 1 10`; do wget
>> --post-data="action=post&message=MSG%20$i%20$j' -q
>> --no-http-keep-alive http://localhost:8080/chat/msgs/ -O -; done) & done
>>
>> And 60000 messages were deliverd to the listening connection:
>>
>> $ grep MSG log1 | wc -l
>> 60000
>>
>> But not 60000 different messages:
>>
>> $ grep MSG log1 | sort -u | wc -l
>> 59075
>> $ grep MSG log1 | sort |uniq -c | sort -n | tail
>>       2 MSG 6 9363 Fri Jun 05 20:17:15 CEST 2009
>>       2 MSG 6 9507 Fri Jun 05 20:17:20 CEST 2009
>>       2 MSG 6 9529 Fri Jun 05 20:17:20 CEST 2009
>>       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
>>       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
>>       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
>>       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
>>       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
>>       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
>>       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009
>>
>> And not the same ones for both listening connections:
>>
>> $ grep MSG log2 | sort -u | wc -l
>> 58735
>> $ grep MSG log2 | sort |uniq -c | sort -n | tail
>>       2 MSG 6 9561 Fri Jun 05 20:17:21 CEST 2009
>>       2 MSG 6 9679 Fri Jun 05 20:17:23 CEST 2009
>>       2 MSG 6 9750 Fri Jun 05 20:17:24 CEST 2009
>>       2 MSG 6 9751 Fri Jun 05 20:17:24 CEST 2009
>>       2 MSG 6 9899 Fri Jun 05 20:17:26 CEST 2009
>>       3 MSG 2 77 Fri Jun 05 20:14:05 CEST 2009
>>       3 MSG 2 8572 Fri Jun 05 20:16:52 CEST 2009
>>       3 MSG 3 1550 Fri Jun 05 20:14:32 CEST 2009
>>       3 MSG 6 5935 Fri Jun 05 20:15:58 CEST 2009
>>       4 MSG 6 9444 Fri Jun 05 20:17:18 CEST 2009
>>
>> That is why the destructive update performed by event.setMessage is bad:
>> once another thread overwrites the message set by another thread, the
>> original message is forever destroyed.
>
> Bingo. I do see the problem now. Let me work on this ASAP.
>
>
>>
>> I've got a hunch that there is something like a ConcurrentLinkedQueue
>> for the messages missing somewhere.
>
>
> Let me think about it. I should have something ready in the hour or so.
>
> Many many thanks!
>
> -- Jeanfrancois
>
>
>>
>> - Florian.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Florian Hars-3
Jeanfrancois Arcand schrieb:
> this is fixed now and I'm uploading the new artifact (should be there
> soon). Let me know how it goes.

Looks OK, I haven't been able to break it.

But the comments in BroadcasterConfig are now out of date:

     * Set an {@link ExecutorService} which can be used to dispatch
     * {@link AtmosphereEvent}. Set it to null if broadcast must be executed
     * using the calling thread (this is the default).

And while skimming through BroadcasterConfig, doesn't this just apply the last
transform:

     Object transform(F object){
         F transformed = object;
         for (MessageTransformer<F> mf: transformers){
-             transformed = mf.transform(object);
+             transformed = mf.transform(transformed);
         }
         return transformed;
     }


- Florian.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DefaultBroadcaster

Jeanfrancois Arcand
Salut,

Florian Hars wrote:
> Jeanfrancois Arcand schrieb:
>> this is fixed now and I'm uploading the new artifact (should be there
>> soon). Let me know how it goes.
>
> Looks OK, I haven't been able to break it.

Thanks for testing! I will release 0.2 M2 today or tomorrow. I just need
to work on a sample for the resume operations (both for -cpr and -core).

>
> But the comments in BroadcasterConfig are now out of date:
>
>      * Set an {@link ExecutorService} which can be used to dispatch
>      * {@link AtmosphereEvent}. Set it to null if broadcast must be executed
>      * using the calling thread (this is the default).
>
> And while skimming through BroadcasterConfig, doesn't this just apply the last
> transform:
>
>      Object transform(F object){
>          F transformed = object;
>          for (MessageTransformer<F> mf: transformers){
> -             transformed = mf.transform(object);
> +             transformed = mf.transform(transformed);
>          }
>          return transformed;
>      }

Rrrr many Thanks! I need to build that community and have other commiter
that can review what I'm doing to avoid such stupid mistake. Patch applied!

Thanks

-- Jeanfrancois

>
>
> - Florian.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Using Atmosphere with Scala ( was: DefaultBroadcaster)

Jeanfrancois Arcand
In reply to this post by Florian Hars-3
Salut,

[cut ...]
> Then everybody will use the Default* classes, and you still keep the flexibility
> do change things if somebody invents a new protocol that is stupid enough to
> need comet for bidirectional connections.

"Stupid enough"?

The idea here is to not only support Servlet, but any framework that
support Async HTTP. A good example is Grizzly and Netty, which both
supports HTTP. My goal is to comes with a commons set of interface for
such framework so you can run Atmosphere on top of them (on top of any
NIO/Web Framework). Since they don't support the Servlet spec, I needed
a way to make this happens.

I've just fixed the issue with Scala. Now I need to learn how to test
this on top of a container :-). Try it and let me know if you like the
solution.

I'm still unsure if I wan to expose the AtmosphereEventImpl.setMessage
as public, but I think it can be useful to set the message directly
(except you shortcut the MessageTransformer API).

Any feedback appreciated!

Thanks

-- Jeanfrancois


>
> Or something like this.
>
> - Florian
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Florian Hars-3
Jeanfrancois Arcand schrieb:
> "Stupid enough"?
>
> The idea here is to not only support Servlet, but any framework that
> support Async HTTP.

Ah, you want to abstract over the container, not over the transport protocol.
That is indeed worthwhile.

> I've just fixed the issue with Scala. Now I need to learn how to test
> this on top of a container :-).

Just put scala-library.jar in WEB-INF/lib.

> Try it and let me know if you like the solution.

It compiles with fewer casts, but event#getBroadcaster is still
declared as returning a raw Broadcaster, leading to the rather
weird type error:

   [scalac]  found   : java.lang.String("Foo\012")
   [scalac]  required: ?0 where type ?0
   [scalac]         broadcaster.broadcast("Foo\n")
   [scalac]                               ^

So I still have to do event.getBroadcaster().asInstanceOf[Broadcaster[String]]
or so.

And somehow the war on dev.java.net seems to suffer from an incomplete
rebuild, all I get is:

java.lang.NoSuchMethodError: org.atmosphere.cpr.Broadcaster.addAtmosphereEvent(Lorg/atmosphere/cpr/AtmosphereEvent;)Z
        at org.atmosphere.cpr.AtmosphereEventImpl.suspend(AtmosphereEventImpl.java:136)
        at org.atmosphere.cpr.AtmosphereEventImpl.suspend(AtmosphereEventImpl.java:126)
        at org.atmosphere.samples.chat.ChatAtmosphereHandler.onEvent(ChatAtmosphereHandler.java:94)
        at org.atmosphere.cpr.AsynchronousProcessor.action(AsynchronousProcessor.java:113)


- Florian.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Jeanfrancois Arcand
Salut,

Florian Hars wrote:

> Jeanfrancois Arcand schrieb:
>> "Stupid enough"?
>>
>> The idea here is to not only support Servlet, but any framework that
>> support Async HTTP.
>
> Ah, you want to abstract over the container, not over the transport protocol.
> That is indeed worthwhile.
>
>> I've just fixed the issue with Scala. Now I need to learn how to test
>> this on top of a container :-).
>
> Just put scala-library.jar in WEB-INF/lib.

OK Thanks. I'm also looking at Lift to see what I can do.

>
>> Try it and let me know if you like the solution.
>
> It compiles with fewer casts, but event#getBroadcaster is still
> declared as returning a raw Broadcaster, leading to the rather
> weird type error:
>
>    [scalac]  found   : java.lang.String("Foo\012")
>    [scalac]  required: ?0 where type ?0
>    [scalac]         broadcaster.broadcast("Foo\n")
>    [scalac]                               ^
>
> So I still have to do event.getBroadcaster().asInstanceOf[Broadcaster[String]]
> or so.

Really stupid question: how do you compile using Scala? I never played
with it...

>
> And somehow the war on dev.java.net seems to suffer from an incomplete
> rebuild, all I get is:
>
> java.lang.NoSuchMethodError: org.atmosphere.cpr.Broadcaster.addAtmosphereEvent(Lorg/atmosphere/cpr/AtmosphereEvent;)Z
>         at org.atmosphere.cpr.AtmosphereEventImpl.suspend(AtmosphereEventImpl.java:136)
>         at org.atmosphere.cpr.AtmosphereEventImpl.suspend(AtmosphereEventImpl.java:126)
>         at org.atmosphere.samples.chat.ChatAtmosphereHandler.onEvent(ChatAtmosphereHandler.java:94)
>         at org.atmosphere.cpr.AsynchronousProcessor.action(AsynchronousProcessor.java:113)

Grrrrr. I will fix that ASAP. Apology for wasting your time.

Thanks,

-- Jeanfrancois



>
>
> - Florian.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Florian Hars-3
Jeanfrancois Arcand wrote:
> Really stupid question: how do you compile using Scala? I never played
> with it...

You just call scalac instead of (or, in case of a mixed project, before)
javac. I build with ant, all that is required is

   <target name="init">
...
     <taskdef resource="scala/tools/ant/antlib.xml"
             classpath="${scala-library.jar}:${scala-compiler.jar}"/>
   </target>

   <path id="project.classpath">
     <pathelement location="${scala-library.jar}" />
...
   </path>


and

   <target name="build" depends="init">
     <scalac srcdir="src/main" destdir="${build.dir}/classes"
             classpathref="project.classpath"
            deprecation="on"
            />
     <javac srcdir="src/main" destdir="${build.dir}/classes"
             classpathref="project.classpath"
            deprecation="on"
            />
   </target>

In the end you have to copy the library to your deliverables:

   <target name="war" depends="build">
     <war destfile="${build.dir}/chat.war" webxml="./etc/web.xml">
       <lib file="${scala-library.jar}"/>
...
     </war>
   </target>

There are also a maven, eclipse and netbeans plugins which I have not
really worked with.

- Florian

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Florian Hars-3
In reply to this post by Jeanfrancois Arcand
Jeanfrancois Arcand wrote:
> I'm still unsure if I wan to expose the AtmosphereEventImpl.setMessage
> as public, but I think it can be useful to set the message directly

The very *existence* of that method is problematic. Your last fix did
only work because you delegated everything to a SingleThreadExecutor. I
have changed my code to call

bc.setExecutorService(java.util.concurrent.Executors.newFixedThreadPool(6))

and the problem is still there:

$ grep MSG log2 | sort | uniq -c | wc -l
55870
$ grep MSG log2 | sort | uniq -c | sort -n | tail
       5 MSG 6 5194 Tue Jun 09 20:25:01 CEST 2009
       5 MSG 6 5807 Tue Jun 09 20:25:31 CEST 2009
       5 MSG 6 6240 Tue Jun 09 20:25:52 CEST 2009
       5 MSG 6 8059 Tue Jun 09 20:27:19 CEST 2009
       6 MSG 2 2104 Tue Jun 09 20:22:41 CEST 2009
       6 MSG 4 8326 Tue Jun 09 20:27:30 CEST 2009
       6 MSG 5 959 Tue Jun 09 20:21:55 CEST 2009
       7 MSG 4 2577 Tue Jun 09 20:23:05 CEST 2009
       8 MSG 6 5690 Tue Jun 09 20:25:25 CEST 2009
      12 MSG 4 4 Tue Jun 09 20:21:16 CEST 2009

Actually, it is worse than before. I don't quite understand why,
but this may be related (in DefaultBroadcaster#start()):

         if (!started.get()){
             started.set(true);

is not atomic, it should be:

         if (!started.getAndSet(true)) {

- Florian.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Jeanfrancois Arcand
In reply to this post by Florian Hars-3
Thanks for the info. I will give it a try after I completed the @resume
sample.

-- Jeanfrancois

Florian Hars wrote:

> Jeanfrancois Arcand wrote:
>> Really stupid question: how do you compile using Scala? I never played
>> with it...
>
> You just call scalac instead of (or, in case of a mixed project, before)
> javac. I build with ant, all that is required is
>
>   <target name="init">
> ...
>     <taskdef resource="scala/tools/ant/antlib.xml"
>          classpath="${scala-library.jar}:${scala-compiler.jar}"/>
>   </target>
>
>   <path id="project.classpath">
>     <pathelement location="${scala-library.jar}" />
> ...
>   </path>
>
>
> and
>
>   <target name="build" depends="init">
>     <scalac srcdir="src/main" destdir="${build.dir}/classes"
>             classpathref="project.classpath"
>         deprecation="on"
>         />
>     <javac srcdir="src/main" destdir="${build.dir}/classes"
>             classpathref="project.classpath"
>         deprecation="on"
>         />
>   </target>
>
> In the end you have to copy the library to your deliverables:
>
>   <target name="war" depends="build">
>     <war destfile="${build.dir}/chat.war" webxml="./etc/web.xml">
>       <lib file="${scala-library.jar}"/>
> ...
>     </war>
>   </target>
>
> There are also a maven, eclipse and netbeans plugins which I have not
> really worked with.
>
> - Florian
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Jeanfrancois Arcand
In reply to this post by Florian Hars-3
Salut,

Florian Hars wrote:

> Jeanfrancois Arcand wrote:
>> I'm still unsure if I wan to expose the AtmosphereEventImpl.setMessage
>> as public, but I think it can be useful to set the message directly
>
> The very *existence* of that method is problematic. Your last fix did
> only work because you delegated everything to a SingleThreadExecutor. I
> have changed my code to call
>
> bc.setExecutorService(java.util.concurrent.Executors.newFixedThreadPool(6))
>
> and the problem is still there:
>
> $ grep MSG log2 | sort | uniq -c | wc -l
> 55870
> $ grep MSG log2 | sort | uniq -c | sort -n | tail
>       5 MSG 6 5194 Tue Jun 09 20:25:01 CEST 2009
>       5 MSG 6 5807 Tue Jun 09 20:25:31 CEST 2009
>       5 MSG 6 6240 Tue Jun 09 20:25:52 CEST 2009
>       5 MSG 6 8059 Tue Jun 09 20:27:19 CEST 2009
>       6 MSG 2 2104 Tue Jun 09 20:22:41 CEST 2009
>       6 MSG 4 8326 Tue Jun 09 20:27:30 CEST 2009
>       6 MSG 5 959 Tue Jun 09 20:21:55 CEST 2009
>       7 MSG 4 2577 Tue Jun 09 20:23:05 CEST 2009
>       8 MSG 6 5690 Tue Jun 09 20:25:25 CEST 2009
>      12 MSG 4 4 Tue Jun 09 20:21:16 CEST 2009
>
> Actually, it is worse than before. I don't quite understand why,
> but this may be related (in DefaultBroadcaster#start()):
>
>         if (!started.get()){
>             started.set(true);
>
> is not atomic, it should be:
>
>         if (!started.getAndSet(true)) {

Ok that's what you get when you work from JavaOne. I will work on that
ASAP. Many thanks as usual for the help.

-- Jeanfrancois


>
> - Florian.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Atmosphere with Scala ( was: DefaultBroadcaster)

Jeanfrancois Arcand
Salut,

this is fixed now. I will do more testing for sure:

Sending
atmosphere/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java
Transmitting file data .
Committed revision 255.

-- Jeanfrancois

Jeanfrancois Arcand wrote:

> Salut,
>
> Florian Hars wrote:
>> Jeanfrancois Arcand wrote:
>>> I'm still unsure if I wan to expose the
>>> AtmosphereEventImpl.setMessage as public, but I think it can be
>>> useful to set the message directly
>>
>> The very *existence* of that method is problematic. Your last fix did
>> only work because you delegated everything to a SingleThreadExecutor. I
>> have changed my code to call
>>
>> bc.setExecutorService(java.util.concurrent.Executors.newFixedThreadPool(6))
>>
>>
>> and the problem is still there:
>>
>> $ grep MSG log2 | sort | uniq -c | wc -l
>> 55870
>> $ grep MSG log2 | sort | uniq -c | sort -n | tail
>>       5 MSG 6 5194 Tue Jun 09 20:25:01 CEST 2009
>>       5 MSG 6 5807 Tue Jun 09 20:25:31 CEST 2009
>>       5 MSG 6 6240 Tue Jun 09 20:25:52 CEST 2009
>>       5 MSG 6 8059 Tue Jun 09 20:27:19 CEST 2009
>>       6 MSG 2 2104 Tue Jun 09 20:22:41 CEST 2009
>>       6 MSG 4 8326 Tue Jun 09 20:27:30 CEST 2009
>>       6 MSG 5 959 Tue Jun 09 20:21:55 CEST 2009
>>       7 MSG 4 2577 Tue Jun 09 20:23:05 CEST 2009
>>       8 MSG 6 5690 Tue Jun 09 20:25:25 CEST 2009
>>      12 MSG 4 4 Tue Jun 09 20:21:16 CEST 2009
>>
>> Actually, it is worse than before. I don't quite understand why,
>> but this may be related (in DefaultBroadcaster#start()):
>>
>>         if (!started.get()){
>>             started.set(true);
>>
>> is not atomic, it should be:
>>
>>         if (!started.getAndSet(true)) {
>
> Ok that's what you get when you work from JavaOne. I will work on that
> ASAP. Many thanks as usual for the help.
>
> -- Jeanfrancois
>
>
>>
>> - Florian.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]