/[openfoncier]/trunk/services/outgoing/messagesender.php
ViewVC logotype

Diff of /trunk/services/outgoing/messagesender.php

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 961 by mlimic, Mon Dec 3 09:04:35 2012 UTC revision 1005 by fmichon, Thu Dec 6 15:48:07 2012 UTC
# Line 1  Line 1 
1  <?php  <?php
2    
3    /**
4     * Ce fichier permet de créer un consumeur des messages stockées dans la
5     *  pile de RabbitMQ réservée au ERP.
6     *
7     * @package openfoncier
8     */
9    
10  include_once('../../core/om_debug.inc.php');  include_once('../../core/om_debug.inc.php');
11  include_once('../../dyn/services.inc.php');  include_once('../../dyn/services.inc.php');
12    
# Line 7  include_once('../../dyn/services.inc.php Line 14  include_once('../../dyn/services.inc.php
14    
15  include_once('../../core/om_logger.class.php');  include_once('../../core/om_logger.class.php');
16    
17    // inclusion de la librairie php-amqplib
18  require_once('../php/php-amqplib/vendor/autoload.php');  require_once('../php/php-amqplib/vendor/autoload.php');
19  use PhpAmqpLib\Connection\AMQPConnection;  use PhpAmqpLib\Connection\AMQPConnection;
20    
# Line 18  if (DEBUG > PRODUCTION_MODE) { Line 26  if (DEBUG > PRODUCTION_MODE) {
26  }  }
27    
28    
29    /** Utilise pour le test de retour de requete curl
30     */
31  define('HTTP_RESULT_OK', '200');  define('HTTP_RESULT_OK', '200');
32    
33    
34    /**
35     * Fait le parse d'une chaine de characters recu comme
36     * reponse a une requette curl
37     * @param string $return La chaine de caracteres contenant un tableau
38     * en format JSON
39     * @return mixed Tableau associative contenant ce qui se trouve en
40     * format JSON dans l'entree
41     */
42  function parse($return)  function parse($return)
43  {  {
44      // remplace les '{' et '}' par chaine vide      // remplace les '{' et '}' par chaine vide
45      $return = str_replace('{', '', $return);      $return = str_replace('{', '', $return);
46      $return = str_replace('}', '', $return);      $return = str_replace('}', '', $return);
47        // trimme les espaces et tabs
48      $return = trim($return, " \t");      $return = trim($return, " \t");
49      // remplace les retours de ligne      // remplace les retours de ligne
50      $return = preg_replace('/\s\s+/', '', $return);      $return = preg_replace('/\s\s+/', '', $return);
51        // remplace les '"' par chaine vide
52      $return = str_replace("\"", "", $return);      $return = str_replace("\"", "", $return);
53        // remplace ": " par ":"
54      $return = str_replace(": ", ":", $return);      $return = str_replace(": ", ":", $return);
55            
56      $result = array();      $result = array();
57        // obtiens les (clef => valeur) paires
58      $resparray = explode(',', $return);      $resparray = explode(',', $return);
59    
60        // si on a trouve des (clef => valeur) paires
61      if ($resparray) {      if ($resparray) {
62          foreach ($resparray as $resp) {          foreach ($resparray as $resp) {
63              $keyvalue = explode(':', $resp);                          // obtiens le clef et la valeur
64                $keyvalue = explode(':', $resp);
65                // stocke le clef et la valeur dans le $result
66              $result[trim(str_replace('"', '',$keyvalue[0]))] =              $result[trim(str_replace('"', '',$keyvalue[0]))] =
67                  trim(str_replace('"', '', $keyvalue[1]));                  trim(str_replace('"', '', $keyvalue[1]));
68          }          }
69      }      }
70        // retourne le tableau associative
71      return $result;      return $result;
72  }  }
73    
74    
75    /**
76     * Utilise curl pour faire POST a la service/messages d'ERP, et
77     * log le resultat dans le fichier de log
78     * @param string $msg Objet de la classe AMQPMessage
79     */
80  function postViaCurl($msg) {  function postViaCurl($msg) {
81      $data = $msg->body;      $data = $msg->body;
82            
83      global $ERP_URL_MESSAGES;      global $ERP_URL_MESSAGES;
84        // si l'url de resource services/messages d'ERP est vide, ne traite pas
85        // le message recu
86      if (empty($ERP_URL_MESSAGES)) {      if (empty($ERP_URL_MESSAGES)) {
87          logger::instance()->cleanLog();          logger::instance()->cleanLog();
88          logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE);          logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE);
# Line 56  function postViaCurl($msg) { Line 90  function postViaCurl($msg) {
90          return;          return;
91      }          }    
92            
93        // cree une resource curl et set les options pour l'envoi d'un POST
94      $curl_resource = curl_init($ERP_URL_MESSAGES);      $curl_resource = curl_init($ERP_URL_MESSAGES);
95        // set la methode d'envoi, POST
96      curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST");      curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST");
97        // set le message a envoyer
98      curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body);      curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body);
99        // une reponse est attendue
100      curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true);      curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true);
101        // set le header
102      curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array(                                                                                curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array(                                                                          
103          'Content-Type: application/json',          'Content-Type: application/json',
104          'Content-Length: ' . strlen($msg->body))          'Content-Length: ' . strlen($msg->body))
105      );      );
106    
107        // execute la requete curl et obtiens le resultat de la requete
108      $result = curl_exec($curl_resource);      $result = curl_exec($curl_resource);
109        // ferme la resource curl
110      curl_close($curl_resource);      curl_close($curl_resource);
111    
112    
113      // le resultat recu par curl est une chaine des caracteres qui contient      // le resultat recu par curl est une chaine des caracteres qui contient
114      // un tableau      // un tableau JSON, donc appelle parse pour obtenir un tableau associative
115      $result = parse($result);      $result = parse($result);
116            
117      // log le retour      // log le retour de curl dans le fichier de log
118      logger::instance()->cleanLog();      logger::instance()->cleanLog();
119      logger::instance()->log($result['http_code_message'] . ': ' .      logger::instance()->log(date('d/m/Y H:i'). ': '.
120                                $result['http_code_message'] . ' ' .
121                              $result['message'].' Message: '.                              $result['message'].' Message: '.
122                              $msg->body, EXTRA_VERBOSE_MODE);                              $msg->body, EXTRA_VERBOSE_MODE);
123      logger::instance()->writeLogToFile();      logger::instance()->writeLogToFile();
124            
125        // si le resultat du curl est OK, fais une accuse d'acception pour le message
126      if ($result['http_code'] == HTTP_RESULT_OK) {      if ($result['http_code'] == HTTP_RESULT_OK) {
127          // accuse de reception du message              // accuse de reception du message    
128          $msg->delivery_info['channel']->          $msg->delivery_info['channel']->
129              basic_ack($msg->delivery_info['delivery_tag']);                          basic_ack($msg->delivery_info['delivery_tag']);            
130      } else {      } else {
131            // le resultat de l'execution de curl indique un erreur
132            // dors un peu avant de refuser le message
133          sleep(2);          sleep(2);
134          // accuse de reception du message              // refuse le message
135          $msg->delivery_info['channel']->basic_reject(          $msg->delivery_info['channel']->basic_reject(
136                              $msg->delivery_info['delivery_tag'], true);                              $msg->delivery_info['delivery_tag'], true);
137                            
138      }      }
139      // Envoie un message avec la chaine "quit" pour canceler le consumeur.      // envoie un message avec la chaine "quit" pour ce consumeur
140      if ($msg->body === 'quit') {      if ($msg->body === 'quit') {
141          $msg->delivery_info['channel']->          $msg->delivery_info['channel']->
142              basic_cancel($msg->delivery_info['consumer_tag']);              basic_cancel($msg->delivery_info['consumer_tag']);
# Line 101  function postViaCurl($msg) { Line 146  function postViaCurl($msg) {
146    
147    
148    
149  // pour des besoins de logging il faut setter $_SERVER["REQUEST_URI"]  // pour des besoins de log il faut set $_SERVER["REQUEST_URI"]
150  if (!isset($_SERVER["REQUEST_URI"])) {  if (!isset($_SERVER["REQUEST_URI"])) {
151      $_SERVER["REQUEST_URI"] = __FILE__;      $_SERVER["REQUEST_URI"] = __FILE__;
152  }  }
# Line 118  if (!empty($argv) && count($argv) > 1) { Line 163  if (!empty($argv) && count($argv) > 1) {
163      exit;      exit;
164  }  }
165    
166    // un exchange fait le routage des messages vers les piles
167  $exchange = 'router';  $exchange = 'router';
168    // on va utiliser la pile $ERP_QUEUE
169  $queue = $ERP_QUEUE;  $queue = $ERP_QUEUE;
170    // l'identifiant unique de consumeur ; il est utilise par RabbitMQ
171  $consumer_tag = 'consumer'.$suffix;  $consumer_tag = 'consumer'.$suffix;
172    
173    // etablis une connexion avec RabbitMQ instance
174  $conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT,  $conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT,
175                             $ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD,                             $ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD,
176                             $ERP_CONNECTION_VHOST);                             $ERP_CONNECTION_VHOST);
177                                                        
178    // obtiens un canal de la connexion
179  $ch = $conn->channel();  $ch = $conn->channel();
180    // si le canal n'est pas disponible arretes l'execution
181  if (is_null($ch)) {  if (is_null($ch)) {
182      // probleme avec la connexion, donc log le fait      // probleme avec la connexion, log le fait et arrete l'execution
183      logger::instance()->log('Probleme dans l\'etablisement de la connexion',      logger::instance()->log('Probleme dans l\'etablisement de la connexion',
184                              EXTRA_VERBOSE_MODE);                              EXTRA_VERBOSE_MODE);
185      logger::instance()->writeLogToFile();      logger::instance()->writeLogToFile();
186        exit;
187  }  }
188    
189    // declare une pile (la creer s'il faut)
190  $ch->queue_declare($queue, false, true, false, false);  $ch->queue_declare($queue, false, true, false, false);
191    
192    // declare un exchange qui va diriger les messages
193  $ch->exchange_declare($exchange, 'direct', false, true, false);  $ch->exchange_declare($exchange, 'direct', false, true, false);
194    
195    // relier la pile d'ERP au exchange
196  $ch->queue_bind($queue, $exchange);  $ch->queue_bind($queue, $exchange);
197    
198    // registre le callback pour ce consumer aupres de RabbitMQ
199  $ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl');  $ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl');
200    
201  function shutdown($ch, $conn) {  // loop as long as the channel has callbacks registered
     $ch->close();  
     $conn->close();  
 }  
 register_shutdown_function('shutdown', $ch, $conn);  
   
 // Loop as long as the channel has callbacks registered  
202  while (count($ch->callbacks)) {  while (count($ch->callbacks)) {
203      $ch->wait();      $ch->wait();
204  }  }

Legend:
Removed from v.961  
changed lines
  Added in v.1005

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26