Improve frame stack and add test
parent
79a3ca34c4
commit
8b0bbd4db2
45
stomp.c
45
stomp.c
|
@ -33,6 +33,36 @@
|
|||
ZEND_EXTERN_MODULE_GLOBALS(stomp);
|
||||
extern zend_class_entry *stomp_ce_exception;
|
||||
|
||||
/* {{{ DEBUG */
|
||||
#if PHP_DEBUG
|
||||
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
|
||||
php_printf("------ START FRAME ------\n");
|
||||
php_printf("%s\n", frame->command);
|
||||
/* Headers */
|
||||
if (frame->headers) {
|
||||
char *key;
|
||||
ulong pos;
|
||||
zend_hash_internal_pointer_reset(frame->headers);
|
||||
|
||||
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
|
||||
char *value = NULL;
|
||||
|
||||
php_printf("%s:", key);
|
||||
|
||||
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
|
||||
php_printf("%s", value);
|
||||
}
|
||||
|
||||
php_printf("\n");
|
||||
zend_hash_move_forward(frame->headers);
|
||||
}
|
||||
}
|
||||
php_printf("\n%s\n", frame->body);
|
||||
php_printf("------ END FRAME ------\n");
|
||||
}
|
||||
#endif
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_init
|
||||
*/
|
||||
stomp_t *stomp_init()
|
||||
|
@ -170,6 +200,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
|
|||
char error[1024];
|
||||
socklen_t size;
|
||||
struct timeval tv;
|
||||
int flag = 1;
|
||||
|
||||
if (stomp->host != NULL)
|
||||
{
|
||||
|
@ -190,7 +221,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
|
|||
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
int flag = 1;
|
||||
|
||||
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
|
||||
|
||||
size = sizeof(stomp->localaddr);
|
||||
|
@ -546,13 +577,13 @@ void stomp_free_frame(stomp_frame_t *frame)
|
|||
|
||||
/* {{{ stomp_read_frame
|
||||
*/
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
||||
stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
|
||||
{
|
||||
stomp_frame_t *f = NULL;
|
||||
char *cmd = NULL, *length_str = NULL;
|
||||
int length = 0;
|
||||
|
||||
if (stomp->frame_stack) {
|
||||
if (use_stack && stomp->frame_stack) {
|
||||
return stomp_frame_stack_shift(&stomp->frame_stack);
|
||||
}
|
||||
|
||||
|
@ -648,10 +679,9 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
|||
char *receipt = NULL;
|
||||
|
||||
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
|
||||
stomp_frame_stack_t *stack = NULL;
|
||||
success = 0;
|
||||
while (1) {
|
||||
stomp_frame_t *res = stomp_read_frame(stomp);
|
||||
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0);
|
||||
if (res) {
|
||||
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
|
||||
char *receipt_id = NULL;
|
||||
|
@ -663,7 +693,6 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
|||
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id);
|
||||
}
|
||||
stomp_free_frame(res);
|
||||
stomp->frame_stack = stack;
|
||||
return success;
|
||||
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
|
||||
char *error_msg = NULL;
|
||||
|
@ -671,13 +700,11 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
|||
stomp_set_error(stomp, error_msg, 0, "%s", res->body);
|
||||
}
|
||||
stomp_free_frame(res);
|
||||
stomp->frame_stack = stack;
|
||||
return success;
|
||||
} else {
|
||||
stomp_frame_stack_push(&stack, res);
|
||||
stomp_frame_stack_push(&stomp->frame_stack, res);
|
||||
}
|
||||
} else {
|
||||
stomp->frame_stack = stack;
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
|
3
stomp.h
3
stomp.h
|
@ -84,13 +84,14 @@ stomp_t *stomp_init();
|
|||
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
|
||||
void stomp_close(stomp_t *stomp);
|
||||
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *connection);
|
||||
stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack);
|
||||
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
|
||||
int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec);
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
|
||||
void stomp_free_frame(stomp_frame_t *frame);
|
||||
|
||||
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
|
||||
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
|
||||
#endif /* _STOMP_H_ */
|
||||
|
||||
/*
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
--TEST--
|
||||
Test stomp::send() - test send with receipt
|
||||
--SKIPIF--
|
||||
<?php
|
||||
if (!extension_loaded("stomp")) print "skip";
|
||||
if (!stomp_connect()) print "skip";
|
||||
?>
|
||||
--FILE--
|
||||
<?php
|
||||
$s = new Stomp();
|
||||
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
|
||||
?>
|
||||
--EXPECTF--
|
||||
bool(true)
|
|
@ -0,0 +1,26 @@
|
|||
--TEST--
|
||||
Test stomp::readFrame() - test frame stack
|
||||
--SKIPIF--
|
||||
<?php
|
||||
if (!extension_loaded("stomp")) print "skip";
|
||||
if (!stomp_connect()) print "skip";
|
||||
?>
|
||||
--FILE--
|
||||
<?php
|
||||
$s = new Stomp();
|
||||
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
|
||||
var_dump($s->readFrame()->body);
|
||||
var_dump($s->readFrame()->body);
|
||||
var_dump($s->readFrame()->body);
|
||||
?>
|
||||
--EXPECTF--
|
||||
bool(true)
|
||||
bool(true)
|
||||
bool(true)
|
||||
bool(true)
|
||||
string(8) "Message1"
|
||||
string(8) "Message2"
|
||||
string(8) "Message3"
|
Loading…
Reference in New Issue