<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 TRANSITIONAL//EN">
<HTML>
<HEAD>
  <META HTTP-EQUIV="Content-Type" CONTENT="text/html; CHARSET=UTF-8">
  <META NAME="GENERATOR" CONTENT="GtkHTML/4.2.3">
</HEAD>
<BODY>
Hi,<BR>
<BR>
As part of some work to get the maximum performance out of Kamailio RLS and Presence I would like to be able to hand-off processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes.&nbsp; This way, even if large numbers of presence requests are received on a single TCP connection, all that the Kamailio process managing that connection has to do is to de-packetise the requests and hand them off.<BR>
<BR>
I have been looking at the ASYNC module and it doesn't seem to do quite the right thing for this.&nbsp; The issue is the sleep parameter.&nbsp; I want my worker processes to process presence requests continuously from a queue, not suspend them for a second or more and then process them.&nbsp; Is there a simple change to ASYNC to get it to do this?<BR>
<BR>
I currently do what I need in a quite different way (see config fragment below), but I think using ASYNC would be cleaner if it did what I needed...
<BLOCKQUOTE>
<PRE>
...
# ----- mqueue params -----
modparam(&quot;mqueue&quot;, &quot;mqueue&quot;, &quot;name=presence&quot;)

# ----- rtimer params -----
modparam(&quot;rtimer&quot;, &quot;timer&quot;, &quot;name=presence0;interval=1;mode=1;&quot;)
modparam(&quot;rtimer&quot;, &quot;exec&quot;, &quot;timer=presence0;route=PRESENCE_PROCESS&quot;)
modparam(&quot;rtimer&quot;, &quot;timer&quot;, &quot;name=presence1;interval=1;mode=1;&quot;)
modparam(&quot;rtimer&quot;, &quot;exec&quot;, &quot;timer=presence1;route=PRESENCE_PROCESS&quot;)
...
modparam(&quot;rtimer&quot;, &quot;timer&quot;, &quot;name=presence7;interval=1;mode=1;&quot;)
modparam(&quot;rtimer&quot;, &quot;exec&quot;, &quot;timer=presence7;route=PRESENCE_PROCESS&quot;)
...
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (@event!=&quot;presence&quot; &amp;&amp; @event!=&quot;presence.winfo&quot;) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(ev_type) = @event;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;&nbsp; $rm $var(ev_type) not a presence message for myself\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!t_suspend()) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply(&quot;500&quot;, &quot;Server Internal Error&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_ERR&quot;, &quot;Failed to suspend transaction for $rm\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;Suspended transaction for $rm [$T(id_index):$T(id_label)]\n&quot;);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!mq_add(&quot;presence&quot;, &quot;$T(id_index)&quot;, &quot;$T(id_label)&quot;)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply(&quot;500&quot;, &quot;Server Internal Error&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_ERR&quot;, &quot;Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
...
route[PRESENCE_PROCESS] {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; lock(&quot;pres&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(pres) = $shv(pres);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $shv(pres) = $shv(pres) + 1;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; unlock(&quot;pres&quot;);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_WARN&quot;, &quot;Starting presence de-queue process $var(pres) (pid: $pp)\n&quot;);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (1) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (mq_fetch(&quot;presence&quot;)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(id_index) = (int) $mqk(presence);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(id_label) = (int) $mqv(presence);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;Found queued presence transaction [$var(id_index):$var(id_label)]\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_continue(&quot;$var(id_index)&quot;, &quot;$var(id_label)&quot;, &quot;PRESENCE&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; usleep(100000);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
}

route[PRESENCE] {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;$rm: route[PRESENCE] process $var(pres)\n&quot;);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (is_method(&quot;NOTIFY&quot;)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;Sending NOTIFY to RLS\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rls_handle_notify();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else if (is_method(&quot;PUBLISH&quot;)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;Sending PUBLISH to Presence\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; handle_publish();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else if (is_method(&quot;SUBSCRIBE&quot;)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;Sending SUBSCRIBE to RLS\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $var(ret_code) = rls_handle_subscribe();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ($var(ret_code) == 10) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_INFO&quot;, &quot;&nbsp; SUBSCRIBE not for RLS - sending to Presence\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; handle_subscribe();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } else {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; xlog(&quot;L_ERR&quot;, &quot;Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; t_reply(&quot;500&quot;, &quot;Server Internal Error&quot;);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit;
}
</PRE>
</BLOCKQUOTE>
<BR>
Thanks,<BR>
<BR>
Peter<BR>
<BR>
<TABLE CELLSPACING="0" CELLPADDING="0" WIDTH="100%">
<TR>
<TD>
<PRE>
-- 
Peter Dunkley
Technical Director
Crocodile RCS Ltd
</PRE>
</TD>
</TR>
</TABLE>
</BODY>
</HTML>