/[openfoncier]/branches/suivi-app-to-obj/services/outgoing/messagesender.php
ViewVC logotype

Contents of /branches/suivi-app-to-obj/services/outgoing/messagesender.php

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4167 - (show annotations)
Tue May 17 07:45:26 2016 UTC (8 years, 8 months ago) by fmichon
File size: 6891 byte(s)
* Création d'une branche pour déplacer le code de app/ qui concerne le suivi
  des demandes d'avis dans les classes métier concernées.
  

1 <?php
2
3 /**
4 * Ce fichier permet de créer un consumeur des messages stockées dans la
5 * pile de RabbitMQ réservée au ERP.
6 *
7 * @package openfoncier
8 */
9
10 include_once('../../core/om_debug.inc.php');
11 include_once('../../dyn/services.inc.php');
12
13 (defined("DEBUG") ? "" : define("DEBUG", EXTRA_VERBOSE_MODE));
14
15 include_once('../../core/om_logger.class.php');
16
17 // inclusion de la librairie php-amqplib
18 require_once('../php/php-amqplib/vendor/autoload.php');
19 use PhpAmqpLib\Connection\AMQPConnection;
20
21
22 // Si le AMQP_DEBUG est defini a true, les messages DEBUG sont envoyes aux
23 // stdout
24 if (DEBUG > PRODUCTION_MODE) {
25 define('AMQP_DEBUG', true);
26 }
27
28
29 /** Utilise pour le test de retour de requete curl
30 */
31 define('HTTP_RESULT_OK', '200');
32
33
34 /**
35 * Fait le parse d'une chaine de characters recu comme
36 * reponse a une requette curl
37 * @param string $return La chaine de caracteres contenant un tableau
38 * en format JSON
39 * @return mixed Tableau associative contenant ce qui se trouve en
40 * format JSON dans l'entree
41 */
42 function parse($return)
43 {
44 // remplace les '{' et '}' par chaine vide
45 $return = str_replace('{', '', $return);
46 $return = str_replace('}', '', $return);
47 // trimme les espaces et tabs
48 $return = trim($return, " \t");
49 // remplace les retours de ligne
50 $return = preg_replace('/\s\s+/', '', $return);
51 // remplace les '"' par chaine vide
52 $return = str_replace("\"", "", $return);
53 // remplace ": " par ":"
54 $return = str_replace(": ", ":", $return);
55
56 $result = array();
57 // obtiens les (clef => valeur) paires
58 $resparray = explode(',', $return);
59
60 // si on a trouve des (clef => valeur) paires
61 if ($resparray) {
62 foreach ($resparray as $resp) {
63 // obtiens le clef et la valeur
64 $keyvalue = explode(':', $resp);
65 // stocke le clef et la valeur dans le $result
66 $result[trim(str_replace('"', '',$keyvalue[0]))] =
67 trim(str_replace('"', '', $keyvalue[1]));
68 }
69 }
70 // retourne le tableau associative
71 return $result;
72 }
73
74
75 /**
76 * Utilise curl pour faire POST a la service/messages d'ERP, et
77 * log le resultat dans le fichier de log
78 * @param string $msg Objet de la classe AMQPMessage
79 */
80 function postViaCurl($msg) {
81 $data = $msg->body;
82
83 global $ERP_URL_MESSAGES;
84 // si l'url de resource services/messages d'ERP est vide, ne traite pas
85 // le message recu
86 if (empty($ERP_URL_MESSAGES)) {
87 logger::instance()->cleanLog();
88 logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE);
89 logger::instance()->writeLogToFile();
90 return;
91 }
92
93 // cree une resource curl et set les options pour l'envoi d'un POST
94 $curl_resource = curl_init($ERP_URL_MESSAGES);
95 // set la methode d'envoi, POST
96 curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST");
97 // set le message a envoyer
98 curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body);
99 // une reponse est attendue
100 curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true);
101 // set le header
102 curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array(
103 'Content-Type: application/json',
104 'Content-Length: ' . strlen($msg->body))
105 );
106
107 // execute la requete curl et obtiens le resultat de la requete
108 $result = curl_exec($curl_resource);
109 // ferme la resource curl
110 curl_close($curl_resource);
111
112
113 // le resultat recu par curl est une chaine des caracteres qui contient
114 // un tableau JSON, donc appelle parse pour obtenir un tableau associative
115 $result = parse($result);
116
117 // log le retour de curl dans le fichier de log
118 logger::instance()->cleanLog();
119 logger::instance()->log(date('d/m/Y H:i'). ': '.
120 $result['http_code_message'] . ' ' .
121 $result['message'].' Message: '.
122 $msg->body, EXTRA_VERBOSE_MODE);
123 logger::instance()->writeLogToFile();
124
125 // si le resultat du curl est OK, fais une accuse d'acception pour le message
126 if ($result['http_code'] == HTTP_RESULT_OK) {
127 // accuse de reception du message
128 $msg->delivery_info['channel']->
129 basic_ack($msg->delivery_info['delivery_tag']);
130 } else {
131 // le resultat de l'execution de curl indique un erreur
132 // dors un peu avant de refuser le message
133 sleep(2);
134 // refuse le message
135 $msg->delivery_info['channel']->basic_reject(
136 $msg->delivery_info['delivery_tag'], true);
137
138 }
139 // envoie un message avec la chaine "quit" pour ce consumeur
140 if ($msg->body === 'quit') {
141 $msg->delivery_info['channel']->
142 basic_cancel($msg->delivery_info['consumer_tag']);
143 }
144
145 }
146
147
148
149 // pour des besoins de log il faut set $_SERVER["REQUEST_URI"]
150 if (!isset($_SERVER["REQUEST_URI"])) {
151 $_SERVER["REQUEST_URI"] = __FILE__;
152 }
153
154 // si le $argv[1] manque on ne peut pas cree un tag unique pour le
155 // consumeur et dans ce cas-la on termine l'execution
156 $suffix = '';
157 if (!empty($argv) && count($argv) > 1) {
158 $suffix = $argv[1];
159 } else {
160 logger::instance()->log('Missing the command line argument used for tag suffix',
161 EXTRA_VERBOSE_MODE);
162 logger::instance()->writeLogToFile();
163 exit;
164 }
165
166 // un exchange fait le routage des messages vers les piles
167 $exchange = 'router';
168 // on va utiliser la pile $ERP_QUEUE
169 $queue = $ERP_QUEUE;
170 // l'identifiant unique de consumeur ; il est utilise par RabbitMQ
171 $consumer_tag = 'consumer'.$suffix;
172
173 // etablis une connexion avec RabbitMQ instance
174 $conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT,
175 $ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD,
176 $ERP_CONNECTION_VHOST);
177
178 // obtiens un canal de la connexion
179 $ch = $conn->channel();
180 // si le canal n'est pas disponible arretes l'execution
181 if (is_null($ch)) {
182 // probleme avec la connexion, log le fait et arrete l'execution
183 logger::instance()->log('Probleme dans l\'etablisement de la connexion',
184 EXTRA_VERBOSE_MODE);
185 logger::instance()->writeLogToFile();
186 exit;
187 }
188
189 // declare une pile (la creer s'il faut)
190 $ch->queue_declare($queue, false, true, false, false);
191
192 // declare un exchange qui va diriger les messages
193 $ch->exchange_declare($exchange, 'direct', false, true, false);
194
195 // relier la pile d'ERP au exchange
196 $ch->queue_bind($queue, $exchange);
197
198 // registre le callback pour ce consumer aupres de RabbitMQ
199 $ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl');
200
201 // loop as long as the channel has callbacks registered
202 while (count($ch->callbacks)) {
203 $ch->wait();
204 }
205
206
207 ?>

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26