<html>
  <head>
    <meta content="text/html; charset=ISO-8859-1"
      http-equiv="Content-Type">
  </head>
  <body bgcolor="#FFFFFF" text="#000000">
    Hello,<br>
    <br>
    On 3/16/12 12:33 PM, Peter Dunkley wrote:
    <blockquote cite="mid:1331897595.3074.17.camel@pd-laptop-linux"
      type="cite">
      <meta http-equiv="Content-Type" content="text/html;
        charset=ISO-8859-1">
      <meta name="GENERATOR" content="GtkHTML/4.2.3">
      Hi,<br>
      <br>
      As part of some work to get the maximum performance out of
      Kamailio RLS and Presence I would like to be able to hand-off
      processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE)
      to some worker processes.&nbsp; This way, even if large numbers of
      presence requests are received on a single TCP connection, all
      that the Kamailio process managing that connection has to do is to
      de-packetise the requests and hand them off.<br>
      <br>
      I have been looking at the ASYNC module and it doesn't seem to do
      quite the right thing for this.&nbsp; The issue is the sleep
      parameter.&nbsp; I want my worker processes to process presence
      requests continuously from a queue, not suspend them for a second
      or more and then process them.&nbsp; Is there a simple change to ASYNC
      to get it to do this?<br>
      <br>
      I currently do what I need in a quite different way (see config
      fragment below), but I think using ASYNC would be cleaner if it
      did what I needed...
      <blockquote>
        <pre>...
# ----- mqueue params -----
modparam("mqueue", "mqueue", "name=presence")

# ----- rtimer params -----
modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;")
modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS")
modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;")
modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS")
...
modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;")
modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS")
...
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (@event!="presence" &amp;&amp; @event!="presence.winfo") {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(ev_type) = @event;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "&nbsp; $rm $var(ev_type) not a presence message for myself\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!t_suspend()) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply("500", "Server Internal Error");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_ERR", "Failed to suspend transaction for $rm\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply("500", "Server Internal Error");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
...
route[PRESENCE_PROCESS] {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; lock("pres");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(pres) = $shv(pres);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $shv(pres) = $shv(pres) + 1;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; unlock("pres");

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n");

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (1) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (mq_fetch("presence")) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(id_index) = (int) $mqk(presence);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(id_label) = (int) $mqv(presence);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_continue("$var(id_index)", "$var(id_label)", "PRESENCE");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; usleep(100000);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
}

route[PRESENCE] {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n");

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (is_method("NOTIFY")) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "Sending NOTIFY to RLS\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rls_handle_notify();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else if (is_method("PUBLISH")) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "Sending PUBLISH to Presence\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; handle_publish();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else if (is_method("SUBSCRIBE")) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(ret_code) = rls_handle_subscribe();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ($var(ret_code) == 10) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_INFO", "&nbsp; SUBSCRIBE not for RLS - sending to Presence\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; handle_subscribe();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply("500", "Server Internal Error");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
}
</pre>
      </blockquote>
      <br>
    </blockquote>
    just for the records, rtimer can do now micro-second timers -- that
    should lower waiting time for your example.<br>
    <br>
    It would be good to have also an 'immediate' execution of a route in
    async module, kind of embedding the above config functionality --
    having a list of tasks in shared memory (id_index, id_label) and a
    pool of workers checking and consuming it -- it will make config
    simpler for such needs, otherwise, the functionality can be achieved
    right now pretty much completely just via config.<br>
    <br>
    Cheers,<br>
    Daniel<br>
    <blockquote cite="mid:1331897595.3074.17.camel@pd-laptop-linux"
      type="cite">
    </blockquote>
    <br>
    <pre class="moz-signature" cols="72">-- 
Daniel-Constantin Mierla
Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
<a class="moz-txt-link-freetext" href="http://www.asipto.com/index.php/kamailio-advanced-training/">http://www.asipto.com/index.php/kamailio-advanced-training/</a></pre>
  </body>
</html>