1 |
<?php |
2 |
|
3 |
/** |
4 |
* Ce fichier permet de créer un consumeur des messages stockées dans la |
5 |
* pile de RabbitMQ réservée au ERP. |
6 |
* |
7 |
* @package openfoncier |
8 |
*/ |
9 |
|
10 |
include_once('../../core/om_debug.inc.php'); |
11 |
include_once('../../dyn/services.inc.php'); |
12 |
|
13 |
(defined("DEBUG") ? "" : define("DEBUG", EXTRA_VERBOSE_MODE)); |
14 |
|
15 |
include_once('../../core/om_logger.class.php'); |
16 |
|
17 |
// inclusion de la librairie php-amqplib |
18 |
require_once('../php/php-amqplib/vendor/autoload.php'); |
19 |
use PhpAmqpLib\Connection\AMQPConnection; |
20 |
|
21 |
|
22 |
// Si le AMQP_DEBUG est defini a true, les messages DEBUG sont envoyes aux |
23 |
// stdout |
24 |
if (DEBUG > PRODUCTION_MODE) { |
25 |
define('AMQP_DEBUG', true); |
26 |
} |
27 |
|
28 |
|
29 |
/** Utilise pour le test de retour de requete curl |
30 |
*/ |
31 |
define('HTTP_RESULT_OK', '200'); |
32 |
|
33 |
|
34 |
/** |
35 |
* Fait le parse d'une chaine de characters recu comme |
36 |
* reponse a une requette curl |
37 |
* @param string $return La chaine de caracteres contenant un tableau |
38 |
* en format JSON |
39 |
* @return mixed Tableau associative contenant ce qui se trouve en |
40 |
* format JSON dans l'entree |
41 |
*/ |
42 |
function parse($return) |
43 |
{ |
44 |
// remplace les '{' et '}' par chaine vide |
45 |
$return = str_replace('{', '', $return); |
46 |
$return = str_replace('}', '', $return); |
47 |
// trimme les espaces et tabs |
48 |
$return = trim($return, " \t"); |
49 |
// remplace les retours de ligne |
50 |
$return = preg_replace('/\s\s+/', '', $return); |
51 |
// remplace les '"' par chaine vide |
52 |
$return = str_replace("\"", "", $return); |
53 |
// remplace ": " par ":" |
54 |
$return = str_replace(": ", ":", $return); |
55 |
|
56 |
$result = array(); |
57 |
// obtiens les (clef => valeur) paires |
58 |
$resparray = explode(',', $return); |
59 |
|
60 |
// si on a trouve des (clef => valeur) paires |
61 |
if ($resparray) { |
62 |
foreach ($resparray as $resp) { |
63 |
// obtiens le clef et la valeur |
64 |
$keyvalue = explode(':', $resp); |
65 |
// stocke le clef et la valeur dans le $result |
66 |
$result[trim(str_replace('"', '',$keyvalue[0]))] = |
67 |
trim(str_replace('"', '', $keyvalue[1])); |
68 |
} |
69 |
} |
70 |
// retourne le tableau associative |
71 |
return $result; |
72 |
} |
73 |
|
74 |
|
75 |
/** |
76 |
* Utilise curl pour faire POST a la service/messages d'ERP, et |
77 |
* log le resultat dans le fichier de log |
78 |
* @param string $msg Objet de la classe AMQPMessage |
79 |
*/ |
80 |
function postViaCurl($msg) { |
81 |
$data = $msg->body; |
82 |
|
83 |
global $ERP_URL_MESSAGES; |
84 |
// si l'url de resource services/messages d'ERP est vide, ne traite pas |
85 |
// le message recu |
86 |
if (empty($ERP_URL_MESSAGES)) { |
87 |
logger::instance()->cleanLog(); |
88 |
logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE); |
89 |
logger::instance()->writeLogToFile(); |
90 |
return; |
91 |
} |
92 |
|
93 |
// cree une resource curl et set les options pour l'envoi d'un POST |
94 |
$curl_resource = curl_init($ERP_URL_MESSAGES); |
95 |
// set la methode d'envoi, POST |
96 |
curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST"); |
97 |
// set le message a envoyer |
98 |
curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body); |
99 |
// une reponse est attendue |
100 |
curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true); |
101 |
// set le header |
102 |
curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array( |
103 |
'Content-Type: application/json', |
104 |
'Content-Length: ' . strlen($msg->body)) |
105 |
); |
106 |
|
107 |
// execute la requete curl et obtiens le resultat de la requete |
108 |
$result = curl_exec($curl_resource); |
109 |
// ferme la resource curl |
110 |
curl_close($curl_resource); |
111 |
|
112 |
|
113 |
// le resultat recu par curl est une chaine des caracteres qui contient |
114 |
// un tableau JSON, donc appelle parse pour obtenir un tableau associative |
115 |
$result = parse($result); |
116 |
|
117 |
// log le retour de curl dans le fichier de log |
118 |
logger::instance()->cleanLog(); |
119 |
logger::instance()->log(date('d/m/Y H:i'). ': '. |
120 |
$result['http_code_message'] . ' ' . |
121 |
$result['message'].' Message: '. |
122 |
$msg->body, EXTRA_VERBOSE_MODE); |
123 |
logger::instance()->writeLogToFile(); |
124 |
|
125 |
// si le resultat du curl est OK, fais une accuse d'acception pour le message |
126 |
if ($result['http_code'] == HTTP_RESULT_OK) { |
127 |
// accuse de reception du message |
128 |
$msg->delivery_info['channel']-> |
129 |
basic_ack($msg->delivery_info['delivery_tag']); |
130 |
} else { |
131 |
// le resultat de l'execution de curl indique un erreur |
132 |
// dors un peu avant de refuser le message |
133 |
sleep(2); |
134 |
// refuse le message |
135 |
$msg->delivery_info['channel']->basic_reject( |
136 |
$msg->delivery_info['delivery_tag'], true); |
137 |
|
138 |
} |
139 |
// envoie un message avec la chaine "quit" pour ce consumeur |
140 |
if ($msg->body === 'quit') { |
141 |
$msg->delivery_info['channel']-> |
142 |
basic_cancel($msg->delivery_info['consumer_tag']); |
143 |
} |
144 |
|
145 |
} |
146 |
|
147 |
|
148 |
|
149 |
// pour des besoins de log il faut set $_SERVER["REQUEST_URI"] |
150 |
if (!isset($_SERVER["REQUEST_URI"])) { |
151 |
$_SERVER["REQUEST_URI"] = __FILE__; |
152 |
} |
153 |
|
154 |
// si le $argv[1] manque on ne peut pas cree un tag unique pour le |
155 |
// consumeur et dans ce cas-la on termine l'execution |
156 |
$suffix = ''; |
157 |
if (!empty($argv) && count($argv) > 1) { |
158 |
$suffix = $argv[1]; |
159 |
} else { |
160 |
logger::instance()->log('Missing the command line argument used for tag suffix', |
161 |
EXTRA_VERBOSE_MODE); |
162 |
logger::instance()->writeLogToFile(); |
163 |
exit; |
164 |
} |
165 |
|
166 |
// un exchange fait le routage des messages vers les piles |
167 |
$exchange = 'router'; |
168 |
// on va utiliser la pile $ERP_QUEUE |
169 |
$queue = $ERP_QUEUE; |
170 |
// l'identifiant unique de consumeur ; il est utilise par RabbitMQ |
171 |
$consumer_tag = 'consumer'.$suffix; |
172 |
|
173 |
// etablis une connexion avec RabbitMQ instance |
174 |
$conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT, |
175 |
$ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD, |
176 |
$ERP_CONNECTION_VHOST); |
177 |
|
178 |
// obtiens un canal de la connexion |
179 |
$ch = $conn->channel(); |
180 |
// si le canal n'est pas disponible arretes l'execution |
181 |
if (is_null($ch)) { |
182 |
// probleme avec la connexion, log le fait et arrete l'execution |
183 |
logger::instance()->log('Probleme dans l\'etablisement de la connexion', |
184 |
EXTRA_VERBOSE_MODE); |
185 |
logger::instance()->writeLogToFile(); |
186 |
exit; |
187 |
} |
188 |
|
189 |
// declare une pile (la creer s'il faut) |
190 |
$ch->queue_declare($queue, false, true, false, false); |
191 |
|
192 |
// declare un exchange qui va diriger les messages |
193 |
$ch->exchange_declare($exchange, 'direct', false, true, false); |
194 |
|
195 |
// relier la pile d'ERP au exchange |
196 |
$ch->queue_bind($queue, $exchange); |
197 |
|
198 |
// registre le callback pour ce consumer aupres de RabbitMQ |
199 |
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl'); |
200 |
|
201 |
// loop as long as the channel has callbacks registered |
202 |
while (count($ch->callbacks)) { |
203 |
$ch->wait(); |
204 |
} |
205 |
|
206 |
|
207 |
?> |